eventscore.core package¶
Submodules¶
eventscore.core.abstract module¶
- class eventscore.core.abstract.IConsumer(func: ~collections.abc.Callable[[~eventscore.core.types.Event], ~typing.Any], logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]¶
Bases:
ProtocolConsumer class. One and only purpose of this class is to consume events.
- class eventscore.core.abstract.IECore(*args, **kwargs)[source]¶
Bases:
ProtocolEvent core class. Can be used for:
Accessing pipeline processor
Accessing worker spawner
Accessing producer
Accessing event stream
Decorating functions to make them consumers
Registering functions to make them consumers
Discovering consumers marked with @(ecore.)consumer decorator
Producing events
Spawning workers bulit from registered consumers
- consumer(func: Callable[[Event], Any] | None = None, *, event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, clones: int = 1) Callable[[Event], Any][source]¶
Decorator for consumer functions
- Parameters:
func (ConsumerFunc | None) – function to decorate
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
- Returns:
Decorated function
- Return type:
ConsumerFunc
- discover_consumers(*, root: str = '') None[source]¶
Discover consumers within given package root.
- Parameters:
root (str | None) – root package to search in. Current directory is used by default.
- Returns:
None
- Return type:
None
- property process_pipeline: IProcessPipeline¶
Pipeline processor getter
- Returns:
Pipeline processor
- Return type:
- produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]¶
Produce an event
- Parameters:
event (Event) – Event to produce
block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.
timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.
- Returns:
None
- Return type:
None
- register_consumer(func: Callable[[Event], Any], event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, *, clones: int = 1, func_path: str | None = None) None[source]¶
Consumer function registrator
- Parameters:
func (ConsumerFunc) – Function to register as a consumer
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to
` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `
- Returns:
None
- Return type:
None
- property spawn_worker: ISpawnWorker¶
Worker spawner getter
- Returns:
Worker spawner
- Return type:
- spawn_workers() None[source]¶
Spawn workers for registered consumers. Further consumer registering and spawning won’t matter
- Returns:
None
- Return type:
None
- property stream_factory: IStreamFactory¶
Stream factory getter
- Returns:
Stream factory
- Return type:
- class eventscore.core.abstract.IEventSerializer(*args, **kwargs)[source]¶
Bases:
Protocol[IType,RType]Event serializer class. One and only purpose of this class is to encode and decode events.
- class eventscore.core.abstract.IProcessPipeline(*args, **kwargs)[source]¶
Bases:
ProtocolPipeline processor class. One and only purpose of this class is to build worker based on a given pipeline.
- class eventscore.core.abstract.IProducer(*args, **kwargs)[source]¶
Bases:
ProtocolProducer class. One and only purpose of this class is to produce events.
- produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]¶
Produce an event
- Parameters:
event (Event) – Event to produce
block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.
timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.
- Returns:
None
- Return type:
None
- class eventscore.core.abstract.IRunner(stream_factory: ~eventscore.core.abstract.IStreamFactory, event: str | ~enum.StrEnum | ~enum.IntEnum, group: str | ~enum.StrEnum | ~enum.IntEnum, *consumers: ~eventscore.core.abstract.IConsumer, max_events: int = -1, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]¶
Bases:
ProtocolRunner class. One and only purpose of this class is to run given consumers.
- Parameters:
- class eventscore.core.abstract.ISpawnWorker(*args, **kwargs)[source]¶
Bases:
ProtocolWorker spawner class. One and only purpose of this class is to spawn a given worker.
- class eventscore.core.abstract.IStream(*args, **kwargs)[source]¶
Bases:
ProtocolEvent stream class. One and only purpose of this class is to put and pop events.
- pop(event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, *, block: bool = True, timeout: int = 5) Event[source]¶
Pop an event from stream
- Parameters:
event (EventType) – Event type
block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.
timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.
- Returns:
Next unprocessed event in stream
- Return type:
- put(event: Event, *, block: bool = True, timeout: int = 5) None[source]¶
Put an event to stream
- Parameters:
event (Event) – Event to put
block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.
timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.
- Returns:
None
- Return type:
None
eventscore.core.consumers module¶
eventscore.core.ecore module¶
- class eventscore.core.ecore.ECore(stream_factory: ~eventscore.core.abstract.IStreamFactory, *, process_pipeline: ~eventscore.core.abstract.IProcessPipeline | None = None, process_pipeline_type: type[~eventscore.core.abstract.IProcessPipeline] = <class 'eventscore.core.pipelines.ProcessPipeline'>, process_pipeline_init_kwargs: dict[str, ~typing.Any] | None = None, spawn_worker: ~eventscore.core.abstract.ISpawnWorker | None = None, spawn_worker_type: type[~eventscore.core.abstract.ISpawnWorker] = <class 'eventscore.core.workers.SpawnMPWorker'>, spawn_worker_init_kwargs: dict[str, ~typing.Any] | None = None, producer: ~eventscore.core.abstract.IProducer | None = None, producer_type: type[~eventscore.core.abstract.IProducer] = <class 'eventscore.core.producers.Producer'>, producer_init_kwargs: dict[str, ~typing.Any] | None = None, logger: ~logging.Logger = <RootLogger root (DEBUG)>, skipping_predicate: ~collections.abc.Callable[[], bool] | None = <function _skipping_predicate>)[source]¶
Bases:
IECore- consumer(func: Callable[[Event], Any] | None = None, *, event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, clones: int = 1) Callable[[Event], Any][source]¶
Decorator for consumer functions
- Parameters:
func (ConsumerFunc | None) – function to decorate
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
- Returns:
Decorated function
- Return type:
ConsumerFunc
- discover_consumers(*, root: str = '') None[source]¶
Discover consumers within given package root.
- Parameters:
root (str | None) – root package to search in. Current directory is used by default.
- Returns:
None
- Return type:
None
- property process_pipeline: IProcessPipeline¶
Pipeline processor getter
- Returns:
Pipeline processor
- Return type:
- produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]¶
Produce an event
- Parameters:
event (Event) – Event to produce
block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.
timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.
- Returns:
None
- Return type:
None
- register_consumer(func: Callable[[Event], Any], event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, *, clones: int = 1, func_path: str | None = None) None[source]¶
Consumer function registrator
- Parameters:
func (ConsumerFunc) – Function to register as a consumer
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to
` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `
- Returns:
None
- Return type:
None
- property spawn_worker: ISpawnWorker¶
Worker spawner getter
- Returns:
Worker spawner
- Return type:
- spawn_workers() None[source]¶
Spawn workers for registered consumers. Further consumer registering and spawning won’t matter
- Returns:
None
- Return type:
None
- property stream_factory: IStreamFactory¶
Stream factory getter
- Returns:
Stream factory
- Return type:
eventscore.core.exceptions module¶
- exception eventscore.core.exceptions.AlreadySpawnedError[source]¶
Bases:
EventsCoreError- message = 'Not able to modify consumers after spawning workers.'¶
- exception eventscore.core.exceptions.ClonesMismatchError[source]¶
Bases:
EventsCoreError- message = 'Pipeline must have the same number of clones for all items.'¶
- exception eventscore.core.exceptions.EmptyPipelineError[source]¶
Bases:
EventsCoreError- message = 'Pipeline must have at least one item.'¶
- exception eventscore.core.exceptions.EmptyStreamError[source]¶
Bases:
EventsCoreError- message = 'Stream does not have unprocessed messages.'¶
- exception eventscore.core.exceptions.EventNotSentError[source]¶
Bases:
EventsCoreError- message = 'Could not send message to stream due to an unexpected error.'¶
- exception eventscore.core.exceptions.EventsCoreError[source]¶
Bases:
Exception- message = 'eventscore error occured.'¶
- exception eventscore.core.exceptions.NotADirectoryError[source]¶
Bases:
EventsCoreError- message = 'Provided root is not a directory.'¶
- exception eventscore.core.exceptions.NotAPackageError[source]¶
Bases:
EventsCoreError- message = 'Provided root is not a Python package.'¶
- exception eventscore.core.exceptions.PathError[source]¶
Bases:
EventsCoreError- message = 'Provided path does not exist.'¶
- exception eventscore.core.exceptions.TooManyDataError[source]¶
Bases:
EventsCoreError- message = 'Unexpected number of data received for event.'¶
Bases:
EventsCoreError
eventscore.core.logging module¶
eventscore.core.pipelines module¶
- class eventscore.core.pipelines.ProcessPipeline(consumer_type: type[~eventscore.core.abstract.IConsumer] = <class 'eventscore.core.consumers.Consumer'>, runner_type: type[~eventscore.core.abstract.IRunner] = <class 'eventscore.core.runners.ObserverRunner'>, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]¶
Bases:
IProcessPipeline
eventscore.core.producers module¶
- class eventscore.core.producers.Producer(ecore: ~eventscore.core.abstract.IECore, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]¶
Bases:
IProducer- produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]¶
Produce an event
- Parameters:
event (Event) – Event to produce
block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.
timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.
- Returns:
None
- Return type:
None
eventscore.core.runners module¶
- class eventscore.core.runners.ObserverRunner(stream_factory: ~eventscore.core.abstract.IStreamFactory, event: str | ~enum.StrEnum | ~enum.IntEnum, group: str | ~enum.StrEnum | ~enum.IntEnum, *consumers: ~eventscore.core.abstract.IConsumer, max_events: int = -1, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]¶
Bases:
IRunner
eventscore.core.serializers module¶
eventscore.core.types module¶
- class eventscore.core.types.DeliverySemantic(*values)[source]¶
Bases:
IntEnum- AT_LEAST_ONCE = 1¶
- AT_MOST_ONCE = 0¶
- EXACTLY_ONCE = 2¶
- class eventscore.core.types.Event(type: str | ~enum.StrEnum | ~enum.IntEnum, uid: ~uuid.UUID = <factory>, ts: str = <factory>, payload: dict[str, str | int | bytes] = <factory>)[source]¶
Bases:
objectEvent class
- Parameters:
type (EventType) – type of the event
uid (uuid.UUID) – unique id of the event. Defaults to random uuid4
ts (str) – timestamp of the event. Defaults to current timestamp
payload (dict[str, EncodableT]) – payload of the event. Defaults to empty dict
- asdict() EventDict[source]¶
Custom asdict method for result to be encodable
- Returns:
Dictionary representation of the event
- Return type:
- classmethod fromdict(obj: EventDict) Event[source]¶
Classmethod for event construction from encodable dictionary
- payload: dict[str, str | int | bytes]¶
- ts: str¶
- type: str | StrEnum | IntEnum¶
- uid: UUID¶
- class eventscore.core.types.EventDict[source]¶
Bases:
TypedDict- payload: dict[str, str | int | bytes]¶
- ts: str¶
- type: str¶
- uid: str¶
- class eventscore.core.types.EventStatus(*values)[source]¶
Bases:
IntEnum- FAILED = 2¶
- PENDING = 0¶
- SENT = 1¶
- class eventscore.core.types.Pipeline(uid: ~uuid.UUID = <factory>, items: set[~eventscore.core.types.PipelineItem] = <factory>)[source]¶
Bases:
objectPipeline class
- Parameters:
uid (uuid.UUID) – unique id of the pipeline. Defaults to random uuid4
items (set[PipelineItem]) – set of pipeline items. Defaults to empty set
- items: set[PipelineItem]¶
- uid: UUID¶
- class eventscore.core.types.PipelineItem(func: Callable[[Event], Any], func_path: str, event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum = 'default', clones: int = 1)[source]¶
Bases:
objectPipeline item class
- Parameters:
func (ConsumerFunc) – consumer function
event (EventType) – event type
group (ConsumerGroup) – consumer group. Defaults to DEFAULT_CONSUMER_GROUP
clones (int) – number of clones. Defaults to 1
- __eq__(other: PipelineItem) bool[source]¶
Equality operator for pipeline items. Must use same attributes as __hash__ method.
- Parameters:
other (PipelineItem) – other pipeline item
- Returns:
True if the two pipeline items are equal, False otherwise
- Return type:
bool
- __hash__() int[source]¶
Hash function for pipeline item. Must use same attributes as __eq__ method.
- Returns:
hash value
- Return type:
int
- clones: int¶
- event: str | StrEnum | IntEnum¶
- func_path: str¶
- group: str | StrEnum | IntEnum¶
- class eventscore.core.types.Worker(name: str, runner: ~typing.Any, clones: int = 1, uid: ~uuid.UUID = <factory>)[source]¶
Bases:
objectWorker class
- Parameters:
name (str) – name of the worker
runner (Any) – runner for the worker
clones (int) – number of clones. Defaults to 1
uid (uuid.UUID) – unique id of the worker. Defaults to random uuid4
- clones: int¶
- name: str¶
- runner: Any¶
- uid: UUID¶
eventscore.core.workers module¶
- class eventscore.core.workers.SpawnMPWorker(logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]¶
Bases:
ISpawnWorker