eventscore.core package¶
Submodules¶
eventscore.core.abstract module¶
- class IECore(*args, **kwargs)[source]¶
Bases:
ProtocolEvent core class. Can be used for:
Accessing pipeline processor
Accessing worker spawner
Accessing producer
Accessing event stream
Decorating functions to make them consumers
Registering functions to make them consumers
Discovering consumers marked with @(ecore.)consumer decorator
Producing events
Spawning workers bulit from registered consumers
- property process_pipeline: IProcessPipeline¶
Pipeline processor getter
- Returns:
Pipeline processor
- Return type:
- property spawn_worker: ISpawnWorker¶
Worker spawner getter
- Returns:
Worker spawner
- Return type:
- property stream_factory: IStreamFactory¶
Stream factory getter
- Returns:
Stream factory
- Return type:
- consumer(func=None, *, event, group, clones=1)[source]¶
Decorator for consumer functions
- Parameters:
func (ConsumerFunc | None) – function to decorate
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
- Returns:
Decorated function
- Return type:
ConsumerFunc
- register_consumer(func, event, group, *, clones=1, func_path=None)[source]¶
Consumer function registrator
- Parameters:
func (ConsumerFunc) – Function to register as a consumer
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to
` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `
- Returns:
None
- Return type:
None
- discover_consumers(*, root='')[source]¶
Discover consumers within given package root.
- Parameters:
root (str | None) – root package to search in. Current directory is used by default.
- Returns:
None
- Return type:
None
- class IProcessPipeline(*args, **kwargs)[source]¶
Bases:
ProtocolPipeline processor class. One and only purpose of this class is to build worker based on a given pipeline.
- class ISpawnWorker(*args, **kwargs)[source]¶
Bases:
ProtocolWorker spawner class. One and only purpose of this class is to spawn a given worker.
- class IEventSerializer(*args, **kwargs)[source]¶
Bases:
Protocol[IType,RType]Event serializer class. One and only purpose of this class is to encode and decode events.
- class IProducer(*args, **kwargs)[source]¶
Bases:
ProtocolProducer class. One and only purpose of this class is to produce events.
- class IStream(*args, **kwargs)[source]¶
Bases:
ProtocolEvent stream class. One and only purpose of this class is to put and pop events.
- class IStreamFactory(stream_class, kwargs)[source]¶
Bases:
ProtocolStream factory class. One and only purpose of this class is to create a stream.
- class IRunner(stream_factory, event, group, *consumers, max_events=-1, logger=<RootLogger root (DEBUG)>)[source]¶
Bases:
ProtocolRunner class. One and only purpose of this class is to run given consumers.
- Parameters:
stream (IStream) – Event stream
event (EventType) – Event type
consumers (Tuple[IConsumer, ...]) – Consumers
max_events (int) – Max events to process Defaults to -1. If value is equal to -1, then there is not limit for number of events to process
logger (logging.Logger) – Logger instance
stream_factory (IStreamFactory)
group (ConsumerGroup)
- class IConsumer(func, logger=<RootLogger root (DEBUG)>)[source]¶
Bases:
ProtocolConsumer class. One and only purpose of this class is to consume events.
- Parameters:
func (ConsumerFunc)
logger (logging.Logger)
eventscore.core.consumers module¶
eventscore.core.ecore module¶
- class ECore(stream_factory, *, process_pipeline=None, process_pipeline_type=<class 'eventscore.core.pipelines.ProcessPipeline'>, process_pipeline_init_kwargs=None, spawn_worker=None, spawn_worker_type=<class 'eventscore.core.workers.SpawnMPWorker'>, spawn_worker_init_kwargs=None, producer=None, producer_type=<class 'eventscore.core.producers.Producer'>, producer_init_kwargs=None, logger=<RootLogger root (DEBUG)>, skipping_predicate=<function _skipping_predicate>)[source]¶
Bases:
IECore- Parameters:
stream_factory (IStreamFactory)
process_pipeline (IProcessPipeline | None)
process_pipeline_type (type[IProcessPipeline])
spawn_worker (ISpawnWorker | None)
spawn_worker_type (type[ISpawnWorker])
producer (IProducer | None)
logger (Logger)
- property process_pipeline: IProcessPipeline¶
Pipeline processor getter
- Returns:
Pipeline processor
- Return type:
- property spawn_worker: ISpawnWorker¶
Worker spawner getter
- Returns:
Worker spawner
- Return type:
- property stream_factory: IStreamFactory¶
Stream factory getter
- Returns:
Stream factory
- Return type:
- consumer(func=None, *, event, group, clones=1)[source]¶
Decorator for consumer functions
- Parameters:
func (ConsumerFunc | None) – function to decorate
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
- Returns:
Decorated function
- Return type:
ConsumerFunc
- register_consumer(func, event, group, *, clones=1, func_path=None)[source]¶
Consumer function registrator
- Parameters:
func (ConsumerFunc) – Function to register as a consumer
event (EventType) – Event type
group (ConsumerGroup) – Consumer group
clones (NumberOfClones) – No of clones
func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to
` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `
- Returns:
None
- Return type:
None
- discover_consumers(*, root='')[source]¶
Discover consumers within given package root.
- Parameters:
root (str | None) – root package to search in. Current directory is used by default.
- Returns:
None
- Return type:
None
eventscore.core.exceptions module¶
- exception AlreadySpawnedError[source]¶
Bases:
EventsCoreError- message = 'Not able to modify consumers after spawning workers.'¶
- exception ClonesMismatchError[source]¶
Bases:
EventsCoreError- message = 'Pipeline must have the same number of clones for all items.'¶
- exception EmptyPipelineError[source]¶
Bases:
EventsCoreError- message = 'Pipeline must have at least one item.'¶
Bases:
EventsCoreError
- exception EventNotSentError[source]¶
Bases:
EventsCoreError- message = 'Could not send message to stream due to an unexpected error.'¶
- exception EmptyStreamError[source]¶
Bases:
EventsCoreError- message = 'Stream does not have unprocessed messages.'¶
- exception TooManyDataError[source]¶
Bases:
EventsCoreError- message = 'Unexpected number of data received for event.'¶
- exception PathError[source]¶
Bases:
EventsCoreError- message = 'Provided path does not exist.'¶
- exception NotADirectoryError[source]¶
Bases:
EventsCoreError- message = 'Provided root is not a directory.'¶
- exception NotAPackageError[source]¶
Bases:
EventsCoreError- message = 'Provided root is not a Python package.'¶
eventscore.core.logging module¶
eventscore.core.pipelines module¶
- class ProcessPipeline(consumer_type=<class 'eventscore.core.consumers.Consumer'>, runner_type=<class 'eventscore.core.runners.ObserverRunner'>, logger=<RootLogger root (DEBUG)>)[source]¶
Bases:
IProcessPipeline- Parameters:
logger (logging.Logger)
eventscore.core.producers module¶
eventscore.core.runners module¶
eventscore.core.serializers module¶
eventscore.core.types module¶
- class DeliverySemantic(*values)[source]¶
Bases:
IntEnum- AT_MOST_ONCE = 0¶
- AT_LEAST_ONCE = 1¶
- EXACTLY_ONCE = 2¶
- class Event(type, uid=<factory>, ts=<factory>, payload=<factory>)[source]¶
Bases:
objectEvent class
- Parameters:
- class PipelineItem(func, func_path, event, group='default', clones=1)[source]¶
Bases:
objectPipeline item class
- Parameters:
- class Pipeline(uid=<factory>, items=<factory>)[source]¶
Bases:
objectPipeline class
- Parameters:
uid (uuid.UUID) – unique id of the pipeline. Defaults to random uuid4
items (set[PipelineItem]) – set of pipeline items. Defaults to empty set
- items: set[PipelineItem]¶
eventscore.core.workers module¶
- class SpawnMPWorker(logger=<RootLogger root (DEBUG)>)[source]¶
Bases:
ISpawnWorker- Parameters:
logger (Logger)