Abstract

class IECore(*args, **kwargs)[source]

Bases: Protocol

Event core class. Can be used for:

  • Accessing pipeline processor

  • Accessing worker spawner

  • Accessing producer

  • Accessing event stream

  • Decorating functions to make them consumers

  • Registering functions to make them consumers

  • Discovering consumers marked with @(ecore.)consumer decorator

  • Producing events

  • Spawning workers bulit from registered consumers

property process_pipeline: IProcessPipeline

Pipeline processor getter

Returns:

Pipeline processor

Return type:

IProcessPipeline

property spawn_worker: ISpawnWorker

Worker spawner getter

Returns:

Worker spawner

Return type:

ISpawnWorker

property producer: IProducer

Producer getter

Returns:

Producer

Return type:

IProducer

property stream_factory: IStreamFactory

Stream factory getter

Returns:

Stream factory

Return type:

IStreamFactory

property stream: IStream

Stream getter

Returns:

Stream instance

Return type:

IStream

consumer(func=None, *, event, group, clones=1)[source]

Decorator for consumer functions

Parameters:
  • func (ConsumerFunc | None) – function to decorate

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

Returns:

Decorated function

Return type:

ConsumerFunc

register_consumer(func, event, group, *, clones=1, func_path=None)[source]

Consumer function registrator

Parameters:
  • func (ConsumerFunc) – Function to register as a consumer

  • event (EventType) – Event type

  • group (ConsumerGroup) – Consumer group

  • clones (NumberOfClones) – No of clones

  • func_path (str | None) – Path (absolute preferred) to module where function is defined. Is used for consumer functions equality check to avoid duplicate registering. Defaults to ` (inspect.getsourcefile(func) or "") + ":" + func.__name__ `

Returns:

None

Return type:

None

discover_consumers(*, root='')[source]

Discover consumers within given package root.

Parameters:

root (str | None) – root package to search in. Current directory is used by default.

Returns:

None

Return type:

None

produce(event, *, block=True, timeout=5)[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

spawn_workers()[source]

Spawn workers for registered consumers. Further consumer registering and spawning won’t matter

Returns:

None

Return type:

None

class IProcessPipeline(*args, **kwargs)[source]

Bases: Protocol

Pipeline processor class. One and only purpose of this class is to build worker based on a given pipeline.

class ISpawnWorker(*args, **kwargs)[source]

Bases: Protocol

Worker spawner class. One and only purpose of this class is to spawn a given worker.

class IEventSerializer(*args, **kwargs)[source]

Bases: Protocol[IType, RType]

Event serializer class. One and only purpose of this class is to encode and decode events.

encode(event)[source]

Encode an event

Parameters:

event (Event) – Event to encode

Returns:

Encoded event

Return type:

RType

decode(event)[source]

Decode an event

Parameters:

event (IType) – Event to decode

Returns:

Decoded event

Return type:

Event

class IProducer(*args, **kwargs)[source]

Bases: Protocol

Producer class. One and only purpose of this class is to produce events.

produce(event, *, block=True, timeout=5)[source]

Produce an event

Parameters:
  • event (Event) – Event to produce

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

class IStream(*args, **kwargs)[source]

Bases: Protocol

Event stream class. One and only purpose of this class is to put and pop events.

put(event, *, block=True, timeout=5)[source]

Put an event to stream

Parameters:
  • event (Event) – Event to put

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

Returns:

None

Return type:

None

pop(event, group, *, block=True, timeout=5)[source]

Pop an event from stream

Parameters:
  • event (EventType) – Event type

  • block (bool) – Should I/O be blocked if some delay occurs. Defaults to True.

  • timeout (int) – Number of seconds to wait in case of latency. Defaults to 5.

  • group (str | StrEnum | IntEnum)

Returns:

Next unprocessed event in stream

Return type:

Event

class IStreamFactory(stream_class, kwargs)[source]

Bases: Protocol

Stream factory class. One and only purpose of this class is to create a stream.

Parameters:
class IRunner(stream_factory, event, group, *consumers, max_events=-1, logger=<RootLogger root (DEBUG)>)[source]

Bases: Protocol

Runner class. One and only purpose of this class is to run given consumers.

Parameters:
  • stream (IStream) – Event stream

  • event (EventType) – Event type

  • consumers (Tuple[IConsumer, ...]) – Consumers

  • max_events (int) – Max events to process Defaults to -1. If value is equal to -1, then there is not limit for number of events to process

  • logger (logging.Logger) – Logger instance

  • stream_factory (IStreamFactory)

  • group (ConsumerGroup)

run()[source]

Start runner

Returns:

None

Return type:

None

class IConsumer(func, logger=<RootLogger root (DEBUG)>)[source]

Bases: Protocol

Consumer class. One and only purpose of this class is to consume events.

Parameters:
consume(event)[source]

Consume an event with consumer function

Parameters:

event (Event) – Event to consume

Returns:

None

Return type:

None