eventscore.core package

Submodules

eventscore.core.abstract module

class IECore(*args, **kwargs)[source]

Bases: Protocol

Event core class. Can be used for:

  • Accessing pipeline processor

  • Accessing worker spawner

  • Accessing producer

  • Accessing event stream

  • Decorating functions to make them consumers

  • Registering functions to make them consumers

  • Discovering consumers marked with @(ecore.)consumer decorator

  • Producing events

  • Spawning workers bulit from registered consumers

property process_pipeline: IProcessPipeline

Pipeline processor getter

Returns:

Pipeline processor

Return type:

IProcessPipeline

property spawn_worker: ISpawnWorker

Worker spawner getter

Returns:

Worker spawner

Return type:

ISpawnWorker

property producer: IProducer

Producer getter

Returns:

Producer

Return type:

IProducer

property stream_factory: IStreamFactory

Stream factory getter

Returns:

Stream factory

Return type:

IStreamFactory

property stream: IStream

Stream getter

Returns:

Stream instance

Return type:

IStream

consumer(func=None, *, event, group, clones=1)[source]

Decorator for consumer functions

Parameters:
  • func (ConsumerFunc | None) – function to decorate

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

Returns:

Decorated function

Return type:

ConsumerFunc

register_consumer(func, event, group, *, clones=1, func_path=None)[source]

Consumer function registrator

Parameters:
  • func (ConsumerFunc) – Function to register as a consumer

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

  • func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to ` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `

Returns:

None

Return type:

None

discover_consumers(*, root='')[source]

Discover consumers within given package root.

Parameters:

root (str | None) – root package to search in. Current directory is used by default.

Returns:

None

Return type:

None

produce(event, *, block=True, timeout=5)[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

spawn_workers()[source]

Spawn workers for registered consumers. Further consumer registering and spawning won’t matter

Returns:

None

Return type:

None

class IProcessPipeline(*args, **kwargs)[source]

Bases: Protocol

Pipeline processor class. One and only purpose of this class is to build worker based on a given pipeline.

class ISpawnWorker(*args, **kwargs)[source]

Bases: Protocol

Worker spawner class. One and only purpose of this class is to spawn a given worker.

class IEventSerializer(*args, **kwargs)[source]

Bases: Protocol[IType, RType]

Event serializer class. One and only purpose of this class is to encode and decode events.

encode(event)[source]

Encode an event

Parameters:

event (Event) – Event to encode

Returns:

Encoded event

Return type:

RType

decode(event)[source]

Decode an event

Parameters:

event (IType) – Event to decode

Returns:

Decoded event

Return type:

Event

class IProducer(*args, **kwargs)[source]

Bases: Protocol

Producer class. One and only purpose of this class is to produce events.

produce(event, *, block=True, timeout=5)[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

class IStream(*args, **kwargs)[source]

Bases: Protocol

Event stream class. One and only purpose of this class is to put and pop events.

put(event, *, block=True, timeout=5)[source]

Put an event to stream

Parameters:
  • event (Event) – Event to put

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

pop(event, group, *, block=True, timeout=5)[source]

Pop an event from stream

Parameters:
  • event (EventType) – Event type

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

  • group (str | StrEnum | IntEnum)

Returns:

Next unprocessed event in stream

Return type:

Event

class IStreamFactory(stream_class, kwargs)[source]

Bases: Protocol

Stream factory class. One and only purpose of this class is to create a stream.

Parameters:
class IRunner(stream_factory, event, group, *consumers, max_events=-1, logger=<RootLogger root (DEBUG)>)[source]

Bases: Protocol

Runner class. One and only purpose of this class is to run given consumers.

Parameters:
  • stream (IStream) – Event stream

  • event (EventType) – Event type

  • consumers (Tuple[IConsumer, ...]) – Consumers

  • max_events (int) – Max events to process Defaults to -1. If value is equal to -1, then there is not limit for number of events to process

  • logger (logging.Logger) – Logger instance

  • stream_factory (IStreamFactory)

  • group (ConsumerGroup)

run()[source]

Start runner

Returns:

None

Return type:

None

class IConsumer(func, logger=<RootLogger root (DEBUG)>)[source]

Bases: Protocol

Consumer class. One and only purpose of this class is to consume events.

Parameters:
consume(event)[source]

Consume an event with consumer function

Parameters:

event (Event) – Event to consume

Returns:

None

Return type:

None

eventscore.core.consumers module

class Consumer(func, logger=<RootLogger root (DEBUG)>)[source]

Bases: IConsumer

Parameters:
consume(event)[source]

Consume an event with consumer function

Parameters:

event (Event) – Event to consume

Returns:

None

Return type:

None

eventscore.core.ecore module

class ECore(stream_factory, *, process_pipeline=None, process_pipeline_type=<class 'eventscore.core.pipelines.ProcessPipeline'>, process_pipeline_init_kwargs=None, spawn_worker=None, spawn_worker_type=<class 'eventscore.core.workers.SpawnMPWorker'>, spawn_worker_init_kwargs=None, producer=None, producer_type=<class 'eventscore.core.producers.Producer'>, producer_init_kwargs=None, logger=<RootLogger root (DEBUG)>, skipping_predicate=<function _skipping_predicate>)[source]

Bases: IECore

Parameters:
property process_pipeline: IProcessPipeline

Pipeline processor getter

Returns:

Pipeline processor

Return type:

IProcessPipeline

property spawn_worker: ISpawnWorker

Worker spawner getter

Returns:

Worker spawner

Return type:

ISpawnWorker

property producer: IProducer

Producer getter

Returns:

Producer

Return type:

IProducer

property stream_factory: IStreamFactory

Stream factory getter

Returns:

Stream factory

Return type:

IStreamFactory

property stream: IStream

Stream getter

Returns:

Stream instance

Return type:

IStream

consumer(func=None, *, event, group, clones=1)[source]

Decorator for consumer functions

Parameters:
  • func (ConsumerFunc | None) – function to decorate

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

Returns:

Decorated function

Return type:

ConsumerFunc

register_consumer(func, event, group, *, clones=1, func_path=None)[source]

Consumer function registrator

Parameters:
  • func (ConsumerFunc) – Function to register as a consumer

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

  • func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to ` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `

Returns:

None

Return type:

None

discover_consumers(*, root='')[source]

Discover consumers within given package root.

Parameters:

root (str | None) – root package to search in. Current directory is used by default.

Returns:

None

Return type:

None

produce(event, *, block=True, timeout=5)[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

spawn_workers()[source]

Spawn workers for registered consumers. Further consumer registering and spawning won’t matter

Returns:

None

Return type:

None

eventscore.core.exceptions module

exception EventsCoreError[source]

Bases: Exception

message = 'eventscore error occured.'
exception AlreadySpawnedError[source]

Bases: EventsCoreError

message = 'Not able to modify consumers after spawning workers.'
exception ClonesMismatchError[source]

Bases: EventsCoreError

message = 'Pipeline must have the same number of clones for all items.'
exception EmptyPipelineError[source]

Bases: EventsCoreError

message = 'Pipeline must have at least one item.'
exception UnrelatedConsumersError[source]

Bases: EventsCoreError

message = 'All consumers in pipeline must be related to the same event.'
exception EventNotSentError[source]

Bases: EventsCoreError

message = 'Could not send message to stream due to an unexpected error.'
exception EmptyStreamError[source]

Bases: EventsCoreError

message = 'Stream does not have unprocessed messages.'
exception TooManyDataError[source]

Bases: EventsCoreError

message = 'Unexpected number of data received for event.'
exception PathError[source]

Bases: EventsCoreError

message = 'Provided path does not exist.'
exception NotADirectoryError[source]

Bases: EventsCoreError

message = 'Provided root is not a directory.'
exception NotAPackageError[source]

Bases: EventsCoreError

message = 'Provided root is not a Python package.'

eventscore.core.logging module

eventscore.core.pipelines module

class ProcessPipeline(consumer_type=<class 'eventscore.core.consumers.Consumer'>, runner_type=<class 'eventscore.core.runners.ObserverRunner'>, logger=<RootLogger root (DEBUG)>)[source]

Bases: IProcessPipeline

Parameters:

eventscore.core.producers module

class Producer(ecore, logger=<RootLogger root (DEBUG)>)[source]

Bases: IProducer

Parameters:
produce(event, *, block=True, timeout=5)[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

eventscore.core.runners module

class ObserverRunner(stream_factory, event, group, *consumers, max_events=-1, logger=<RootLogger root (DEBUG)>)[source]

Bases: IRunner

Parameters:
run()[source]

Start runner

Returns:

None

Return type:

None

eventscore.core.serializers module

class EventSerializer(*args, **kwargs)[source]

Bases: IEventSerializer[bytes, bytes]

encode(event)[source]

Encode an event

Parameters:

event (Event) – Event to encode

Returns:

Encoded event

Return type:

RType

decode(event)[source]

Decode an event

Parameters:

event (IType) – Event to decode

Returns:

Decoded event

Return type:

Event

eventscore.core.types module

class DeliverySemantic(*values)[source]

Bases: IntEnum

AT_MOST_ONCE = 0
AT_LEAST_ONCE = 1
EXACTLY_ONCE = 2
class EventStatus(*values)[source]

Bases: IntEnum

PENDING = 0
SENT = 1
FAILED = 2
class EventDict[source]

Bases: TypedDict

type: str
uid: str
ts: str
payload: dict[str, str | int | bytes]
class Event(type, uid=<factory>, ts=<factory>, payload=<factory>)[source]

Bases: object

Event class

Parameters:
  • type (EventType) – type of the event

  • uid (uuid.UUID) – unique id of the event. Defaults to random uuid4

  • ts (str) – timestamp of the event. Defaults to current timestamp

  • payload (dict[str, EncodableT]) – payload of the event. Defaults to empty dict

type: str | StrEnum | IntEnum
uid: UUID
ts: str
payload: dict[str, str | int | bytes]
asdict()[source]

Custom asdict method for result to be encodable

Returns:

Dictionary representation of the event

Return type:

EventDict

classmethod fromdict(obj)[source]

Classmethod for event construction from encodable dictionary

Parameters:

obj (EventDict) – Dictionary representation of the event

Returns:

Event object

Return type:

Event

class PipelineItem(func, func_path, event, group='default', clones=1)[source]

Bases: object

Pipeline item class

Parameters:
  • func (ConsumerFunc) – consumer function

  • event (EventType) – event type

  • group (ConsumerGroup) – consumer group. Defaults to DEFAULT_CONSUMER_GROUP

  • clones (int) – number of clones. Defaults to 1

  • func_path (str)

func: Callable[[Event], Any]
func_path: str
event: str | StrEnum | IntEnum
group: str | StrEnum | IntEnum
clones: int
class Pipeline(uid=<factory>, items=<factory>)[source]

Bases: object

Pipeline class

Parameters:
  • uid (uuid.UUID) – unique id of the pipeline. Defaults to random uuid4

  • items (set[PipelineItem]) – set of pipeline items. Defaults to empty set

uid: UUID
items: set[PipelineItem]
class Worker(name, runner, clones=1, uid=<factory>)[source]

Bases: object

Worker class

Parameters:
  • name (str) – name of the worker

  • runner (Any) – runner for the worker

  • clones (int) – number of clones. Defaults to 1

  • uid (uuid.UUID) – unique id of the worker. Defaults to random uuid4

name: str
runner: Any
clones: int
uid: UUID

eventscore.core.workers module

class SpawnMPWorker(logger=<RootLogger root (DEBUG)>)[source]

Bases: ISpawnWorker

Parameters:

logger (Logger)

Module contents