Source code for eventscore.core.streams
from typing import Any
from eventscore.core.abstract import IStream, IStreamFactory
[docs]
class StreamFactory(IStreamFactory):
def __init__(self, stream_class: type[IStream], kwargs: dict[str, Any]) -> None:
self.__stream_class = stream_class
self.__kwargs = kwargs
def __call__(self) -> IStream:
return self.__stream_class(**self.__kwargs)