eventscore.core package

Submodules

eventscore.core.abstract module

class eventscore.core.abstract.IConsumer(func: ~collections.abc.Callable[[~eventscore.core.types.Event], ~typing.Any], logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: Protocol

Consumer class. One and only purpose of this class is to consume events.

consume(event: Event) None[source]

Consume an event with consumer function

Parameters:

event (Event) – Event to consume

Returns:

None

Return type:

None

class eventscore.core.abstract.IECore(*args, **kwargs)[source]

Bases: Protocol

Event core class. Can be used for:

  • Accessing pipeline processor

  • Accessing worker spawner

  • Accessing producer

  • Accessing event stream

  • Decorating functions to make them consumers

  • Registering functions to make them consumers

  • Discovering consumers marked with @(ecore.)consumer decorator

  • Producing events

  • Spawning workers bulit from registered consumers

consumer(func: Callable[[Event], Any] | None = None, *, event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, clones: int = 1) Callable[[Event], Any][source]

Decorator for consumer functions

Parameters:
  • func (ConsumerFunc | None) – function to decorate

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

Returns:

Decorated function

Return type:

ConsumerFunc

discover_consumers(*, root: str = '') None[source]

Discover consumers within given package root.

Parameters:

root (str | None) – root package to search in. Current directory is used by default.

Returns:

None

Return type:

None

property process_pipeline: IProcessPipeline

Pipeline processor getter

Returns:

Pipeline processor

Return type:

IProcessPipeline

produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

property producer: IProducer

Producer getter

Returns:

Producer

Return type:

IProducer

register_consumer(func: Callable[[Event], Any], event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, *, clones: int = 1, func_path: str | None = None) None[source]

Consumer function registrator

Parameters:
  • func (ConsumerFunc) – Function to register as a consumer

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

  • func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to ` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `

Returns:

None

Return type:

None

property spawn_worker: ISpawnWorker

Worker spawner getter

Returns:

Worker spawner

Return type:

ISpawnWorker

spawn_workers() None[source]

Spawn workers for registered consumers. Further consumer registering and spawning won’t matter

Returns:

None

Return type:

None

property stream: IStream

Stream getter

Returns:

Stream instance

Return type:

IStream

property stream_factory: IStreamFactory

Stream factory getter

Returns:

Stream factory

Return type:

IStreamFactory

class eventscore.core.abstract.IEventSerializer(*args, **kwargs)[source]

Bases: Protocol[IType, RType]

Event serializer class. One and only purpose of this class is to encode and decode events.

decode(event: IType) Event[source]

Decode an event

Parameters:

event (IType) – Event to decode

Returns:

Decoded event

Return type:

Event

encode(event: Event) RType[source]

Encode an event

Parameters:

event (Event) – Event to encode

Returns:

Encoded event

Return type:

RType

class eventscore.core.abstract.IProcessPipeline(*args, **kwargs)[source]

Bases: Protocol

Pipeline processor class. One and only purpose of this class is to build worker based on a given pipeline.

__call__(pipeline: Pipeline, ecore: IECore) Worker[source]

Process a pipeline

Parameters:
  • pipeline (Pipeline) – Pipeline to process

  • ecore (IECore) – Event core instance

Returns:

Constructed worker

Return type:

Worker

class eventscore.core.abstract.IProducer(*args, **kwargs)[source]

Bases: Protocol

Producer class. One and only purpose of this class is to produce events.

produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

class eventscore.core.abstract.IRunner(stream_factory: ~eventscore.core.abstract.IStreamFactory, event: str | ~enum.StrEnum | ~enum.IntEnum, group: str | ~enum.StrEnum | ~enum.IntEnum, *consumers: ~eventscore.core.abstract.IConsumer, max_events: int = -1, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: Protocol

Runner class. One and only purpose of this class is to run given consumers.

Parameters:
  • stream (IStream) – Event stream

  • event (EventType) – Event type

  • consumers (Tuple[IConsumer, ...]) – Consumers

  • max_events (int) – Max events to process Defaults to -1. If value is equal to -1, then there is not limit for number of events to process

  • logger (logging.Logger) – Logger instance

run() None[source]

Start runner

Returns:

None

Return type:

None

class eventscore.core.abstract.ISpawnWorker(*args, **kwargs)[source]

Bases: Protocol

Worker spawner class. One and only purpose of this class is to spawn a given worker.

__call__(worker: Worker) tuple[int, ...][source]

Spawn worker

Parameters:

worker (Worker) – Worker to spawn

Returns:

PIDs

Return type:

tuple[int, …]

class eventscore.core.abstract.IStream(*args, **kwargs)[source]

Bases: Protocol

Event stream class. One and only purpose of this class is to put and pop events.

pop(event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, *, block: bool = True, timeout: int = 5) Event[source]

Pop an event from stream

Parameters:
  • event (EventType) – Event type

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

Next unprocessed event in stream

Return type:

Event

put(event: Event, *, block: bool = True, timeout: int = 5) None[source]

Put an event to stream

Parameters:
  • event (Event) – Event to put

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

class eventscore.core.abstract.IStreamFactory(stream_class: type[IStream], kwargs: dict[str, Any])[source]

Bases: Protocol

Stream factory class. One and only purpose of this class is to create a stream.

__call__() IStream[source]

Create a stream

Returns:

Stream instance

Return type:

IStream

eventscore.core.consumers module

class eventscore.core.consumers.Consumer(func: ~collections.abc.Callable[[~eventscore.core.types.Event], ~typing.Any], logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: IConsumer

consume(event: Event) None[source]

Consume an event with consumer function

Parameters:

event (Event) – Event to consume

Returns:

None

Return type:

None

eventscore.core.ecore module

class eventscore.core.ecore.ECore(stream_factory: ~eventscore.core.abstract.IStreamFactory, *, process_pipeline: ~eventscore.core.abstract.IProcessPipeline | None = None, process_pipeline_type: type[~eventscore.core.abstract.IProcessPipeline] = <class 'eventscore.core.pipelines.ProcessPipeline'>, process_pipeline_init_kwargs: dict[str, ~typing.Any] | None = None, spawn_worker: ~eventscore.core.abstract.ISpawnWorker | None = None, spawn_worker_type: type[~eventscore.core.abstract.ISpawnWorker] = <class 'eventscore.core.workers.SpawnMPWorker'>, spawn_worker_init_kwargs: dict[str, ~typing.Any] | None = None, producer: ~eventscore.core.abstract.IProducer | None = None, producer_type: type[~eventscore.core.abstract.IProducer] = <class 'eventscore.core.producers.Producer'>, producer_init_kwargs: dict[str, ~typing.Any] | None = None, logger: ~logging.Logger = <RootLogger root (DEBUG)>, skipping_predicate: ~collections.abc.Callable[[], bool] | None = <function _skipping_predicate>)[source]

Bases: IECore

consumer(func: Callable[[Event], Any] | None = None, *, event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, clones: int = 1) Callable[[Event], Any][source]

Decorator for consumer functions

Parameters:
  • func (ConsumerFunc | None) – function to decorate

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

Returns:

Decorated function

Return type:

ConsumerFunc

discover_consumers(*, root: str = '') None[source]

Discover consumers within given package root.

Parameters:

root (str | None) – root package to search in. Current directory is used by default.

Returns:

None

Return type:

None

property process_pipeline: IProcessPipeline

Pipeline processor getter

Returns:

Pipeline processor

Return type:

IProcessPipeline

produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

property producer: IProducer

Producer getter

Returns:

Producer

Return type:

IProducer

register_consumer(func: Callable[[Event], Any], event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum, *, clones: int = 1, func_path: str | None = None) None[source]

Consumer function registrator

Parameters:
  • func (ConsumerFunc) – Function to register as a consumer

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

  • func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to ` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `

Returns:

None

Return type:

None

property spawn_worker: ISpawnWorker

Worker spawner getter

Returns:

Worker spawner

Return type:

ISpawnWorker

spawn_workers() None[source]

Spawn workers for registered consumers. Further consumer registering and spawning won’t matter

Returns:

None

Return type:

None

property stream: IStream

Stream getter

Returns:

Stream instance

Return type:

IStream

property stream_factory: IStreamFactory

Stream factory getter

Returns:

Stream factory

Return type:

IStreamFactory

eventscore.core.exceptions module

exception eventscore.core.exceptions.AlreadySpawnedError[source]

Bases: EventsCoreError

message = 'Not able to modify consumers after spawning workers.'
exception eventscore.core.exceptions.ClonesMismatchError[source]

Bases: EventsCoreError

message = 'Pipeline must have the same number of clones for all items.'
exception eventscore.core.exceptions.EmptyPipelineError[source]

Bases: EventsCoreError

message = 'Pipeline must have at least one item.'
exception eventscore.core.exceptions.EmptyStreamError[source]

Bases: EventsCoreError

message = 'Stream does not have unprocessed messages.'
exception eventscore.core.exceptions.EventNotSentError[source]

Bases: EventsCoreError

message = 'Could not send message to stream due to an unexpected error.'
exception eventscore.core.exceptions.EventsCoreError[source]

Bases: Exception

message = 'eventscore error occured.'
exception eventscore.core.exceptions.NotADirectoryError[source]

Bases: EventsCoreError

message = 'Provided root is not a directory.'
exception eventscore.core.exceptions.NotAPackageError[source]

Bases: EventsCoreError

message = 'Provided root is not a Python package.'
exception eventscore.core.exceptions.PathError[source]

Bases: EventsCoreError

message = 'Provided path does not exist.'
exception eventscore.core.exceptions.TooManyDataError[source]

Bases: EventsCoreError

message = 'Unexpected number of data received for event.'
exception eventscore.core.exceptions.UnrelatedConsumersError[source]

Bases: EventsCoreError

message = 'All consumers in pipeline must be related to the same event.'

eventscore.core.logging module

eventscore.core.pipelines module

class eventscore.core.pipelines.ProcessPipeline(consumer_type: type[~eventscore.core.abstract.IConsumer] = <class 'eventscore.core.consumers.Consumer'>, runner_type: type[~eventscore.core.abstract.IRunner] = <class 'eventscore.core.runners.ObserverRunner'>, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: IProcessPipeline

eventscore.core.producers module

class eventscore.core.producers.Producer(ecore: ~eventscore.core.abstract.IECore, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: IProducer

produce(event: Event, *, block: bool = True, timeout: int = 5) None[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

eventscore.core.runners module

class eventscore.core.runners.ObserverRunner(stream_factory: ~eventscore.core.abstract.IStreamFactory, event: str | ~enum.StrEnum | ~enum.IntEnum, group: str | ~enum.StrEnum | ~enum.IntEnum, *consumers: ~eventscore.core.abstract.IConsumer, max_events: int = -1, logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: IRunner

run() None[source]

Start runner

Returns:

None

Return type:

None

eventscore.core.serializers module

class eventscore.core.serializers.EventSerializer(*args, **kwargs)[source]

Bases: IEventSerializer[bytes, bytes]

decode(event: bytes) Event[source]

Decode an event

Parameters:

event (IType) – Event to decode

Returns:

Decoded event

Return type:

Event

encode(event: Event) bytes[source]

Encode an event

Parameters:

event (Event) – Event to encode

Returns:

Encoded event

Return type:

RType

eventscore.core.types module

class eventscore.core.types.DeliverySemantic(*values)[source]

Bases: IntEnum

AT_LEAST_ONCE = 1
AT_MOST_ONCE = 0
EXACTLY_ONCE = 2
class eventscore.core.types.Event(type: str | ~enum.StrEnum | ~enum.IntEnum, uid: ~uuid.UUID = <factory>, ts: str = <factory>, payload: dict[str, str | int | bytes] = <factory>)[source]

Bases: object

Event class

Parameters:
  • type (EventType) – type of the event

  • uid (uuid.UUID) – unique id of the event. Defaults to random uuid4

  • ts (str) – timestamp of the event. Defaults to current timestamp

  • payload (dict[str, EncodableT]) – payload of the event. Defaults to empty dict

asdict() EventDict[source]

Custom asdict method for result to be encodable

Returns:

Dictionary representation of the event

Return type:

EventDict

classmethod fromdict(obj: EventDict) Event[source]

Classmethod for event construction from encodable dictionary

Parameters:

obj (EventDict) – Dictionary representation of the event

Returns:

Event object

Return type:

Event

payload: dict[str, str | int | bytes]
ts: str
type: str | StrEnum | IntEnum
uid: UUID
class eventscore.core.types.EventDict[source]

Bases: TypedDict

payload: dict[str, str | int | bytes]
ts: str
type: str
uid: str
class eventscore.core.types.EventStatus(*values)[source]

Bases: IntEnum

FAILED = 2
PENDING = 0
SENT = 1
class eventscore.core.types.Pipeline(uid: ~uuid.UUID = <factory>, items: set[~eventscore.core.types.PipelineItem] = <factory>)[source]

Bases: object

Pipeline class

Parameters:
  • uid (uuid.UUID) – unique id of the pipeline. Defaults to random uuid4

  • items (set[PipelineItem]) – set of pipeline items. Defaults to empty set

items: set[PipelineItem]
uid: UUID
class eventscore.core.types.PipelineItem(func: Callable[[Event], Any], func_path: str, event: str | StrEnum | IntEnum, group: str | StrEnum | IntEnum = 'default', clones: int = 1)[source]

Bases: object

Pipeline item class

Parameters:
  • func (ConsumerFunc) – consumer function

  • event (EventType) – event type

  • group (ConsumerGroup) – consumer group. Defaults to DEFAULT_CONSUMER_GROUP

  • clones (int) – number of clones. Defaults to 1

__eq__(other: PipelineItem) bool[source]

Equality operator for pipeline items. Must use same attributes as __hash__ method.

Parameters:

other (PipelineItem) – other pipeline item

Returns:

True if the two pipeline items are equal, False otherwise

Return type:

bool

__hash__() int[source]

Hash function for pipeline item. Must use same attributes as __eq__ method.

Returns:

hash value

Return type:

int

clones: int
event: str | StrEnum | IntEnum
func: Callable[[Event], Any]
func_path: str
group: str | StrEnum | IntEnum
class eventscore.core.types.Worker(name: str, runner: ~typing.Any, clones: int = 1, uid: ~uuid.UUID = <factory>)[source]

Bases: object

Worker class

Parameters:
  • name (str) – name of the worker

  • runner (Any) – runner for the worker

  • clones (int) – number of clones. Defaults to 1

  • uid (uuid.UUID) – unique id of the worker. Defaults to random uuid4

clones: int
name: str
runner: Any
uid: UUID

eventscore.core.workers module

class eventscore.core.workers.SpawnMPWorker(logger: ~logging.Logger = <RootLogger root (DEBUG)>)[source]

Bases: ISpawnWorker

Module contents