from __future__ import annotations
import logging
from collections.abc import Callable
from enum import IntEnum, StrEnum
from typing import Any, Protocol, TypeAlias, TypeVar
from eventscore.core.logging import logger as _logger
from eventscore.core.types import Event, Pipeline, Worker
# Type alias for user-defined event type
EventType: TypeAlias = str | StrEnum | IntEnum
# Type alias for user-defined consumer functions/other callable
ConsumerFunc: TypeAlias = Callable[[Event], Any]
# Type alias for user-defined consumer group
ConsumerGroup: TypeAlias = str | StrEnum | IntEnum
# Type alias for number of clones
NumberOfClones: TypeAlias = int
# Type alias for path to consumer function's module
FunctionModulePath: TypeAlias = str
# Type variable for serializer input
IType = TypeVar("IType", contravariant=True)
# Type variable for serializer output
RType = TypeVar("RType", covariant=True)
[docs]
class IECore(Protocol):
"""
Event core class.
Can be used for:
* Accessing pipeline processor
* Accessing worker spawner
* Accessing producer
* Accessing event stream
* Decorating functions to make them consumers
* Registering functions to make them consumers
* Discovering consumers marked with @(ecore.)consumer decorator
* Producing events
* Spawning workers bulit from registered consumers
"""
__slots__ = ()
@property
def process_pipeline(self) -> IProcessPipeline:
"""
Pipeline processor getter
:return: Pipeline processor
:rtype: IProcessPipeline
"""
...
@property
def spawn_worker(self) -> ISpawnWorker:
"""
Worker spawner getter
:return: Worker spawner
:rtype: ISpawnWorker
"""
...
@property
def producer(self) -> IProducer:
"""
Producer getter
:return: Producer
:rtype: IProducer
"""
...
@property
def stream_factory(self) -> IStreamFactory:
"""
Stream factory getter
:return: Stream factory
:rtype: IStreamFactory
"""
...
@property
def stream(self) -> IStream:
"""
Stream getter
:return: Stream instance
:rtype: IStream
"""
...
[docs]
def consumer(
self,
func: ConsumerFunc | None = None,
*,
event: EventType,
group: ConsumerGroup,
clones: NumberOfClones = 1,
) -> ConsumerFunc:
"""
Decorator for consumer functions
:param func: function to decorate
:type func: ConsumerFunc | None
:param event: Event type
:type event: EventType
:param group: Consumer group
:type group: ConsumerGroup
:param clones: No of clones
:type clones: NumberOfClones
:return: Decorated function
:rtype: ConsumerFunc
"""
...
[docs]
def register_consumer(
self,
func: ConsumerFunc,
event: EventType,
group: ConsumerGroup,
*,
clones: NumberOfClones = 1,
func_path: str | None = None,
) -> None:
"""
Consumer function registrator
:param func: Function to register as a consumer
:type func: ConsumerFunc
:param event: Event type
:type event: EventType
:param group: Consumer group
:type group: ConsumerGroup
:param clones: No of clones
:type clones: NumberOfClones
:param func_path: Path (absolute preferred)
to module where function is defined.
Is used for consumer functions equality check
to avoid duplicate registering.
Defaults to
```
(inspect.getsourcefile(func) or "") + ":" + func.__name__
```
:type func_path: str | None
:return: None
:rtype: None
"""
...
[docs]
def discover_consumers(self, *, root: str = "") -> None:
"""
Discover consumers within given package root.
:param root: root package to search in.
Current directory is used by default.
:type root: str | None
:return: None
:rtype: None
"""
...
[docs]
def produce(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
"""
Produce an event
:param event: Event to produce
:type event: Event
:param block: Should I/O be blocked if some delay occurs.
Defaults to `True`.
:type block: bool
:param timeout: Number of seconds to wait in case of latency.
Defaults to `5`.
:type timeout: int
:return: None
:rtype: None
"""
...
[docs]
def spawn_workers(self) -> None:
"""
Spawn workers for registered consumers.
Further consumer registering and spawning won't matter
:return: None
:rtype: None
"""
...
[docs]
class IProcessPipeline(Protocol):
"""
Pipeline processor class.
One and only purpose of this class is to build worker
based on a given pipeline.
"""
__slots__ = ()
def __call__(self, pipeline: Pipeline, ecore: IECore) -> Worker:
"""
Process a pipeline
:param pipeline: Pipeline to process
:type pipeline: Pipeline
:param ecore: Event core instance
:type ecore: IECore
:return: Constructed worker
:rtype: Worker
"""
...
[docs]
class ISpawnWorker(Protocol):
"""
Worker spawner class.
One and only purpose of this class is to spawn a given worker.
"""
__slots__ = ()
def __call__(self, worker: Worker) -> tuple[int, ...]:
"""
Spawn worker
:param worker: Worker to spawn
:type worker: Worker
:return: PIDs
:rtype: tuple[int, ...]
"""
...
[docs]
class IEventSerializer(Protocol[IType, RType]):
"""
Event serializer class.
One and only purpose of this class is to encode and decode events.
"""
__slots__ = ()
[docs]
def encode(self, event: Event) -> RType:
"""
Encode an event
:param event: Event to encode
:type event: Event
:return: Encoded event
:rtype: RType
"""
...
[docs]
def decode(self, event: IType) -> Event:
"""
Decode an event
:param event: Event to decode
:type event: IType
:return: Decoded event
:rtype: Event
"""
...
[docs]
class IProducer(Protocol):
"""
Producer class.
One and only purpose of this class is to produce events.
"""
__slots__ = ()
[docs]
def produce(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
"""
Produce an event
:param event: Event to produce
:type event: Event
:param block: Should I/O be blocked if some delay occurs.
Defaults to `True`.
:type block: bool
:param timeout: Number of seconds to wait in case of latency.
Defaults to `5`.
:type timeout: int
:return: None
:rtype: None
"""
...
[docs]
class IStream(Protocol):
"""
Event stream class.
One and only purpose of this class is to put and pop events.
"""
__slots__ = ()
[docs]
def put(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
"""
Put an event to stream
:param event: Event to put
:type event: Event
:param block: Should I/O be blocked if some delay occurs.
Defaults to `True`.
:type block: bool
:param timeout: Number of seconds to wait in case of latency.
Defaults to `5`.
:type timeout: int
:return: None
:rtype: None
"""
...
[docs]
def pop(
self,
event: EventType,
group: ConsumerGroup,
*,
block: bool = True,
timeout: int = 5,
) -> Event:
"""
Pop an event from stream
:param event: Event type
:type event: EventType
:param block: Should I/O be blocked if some delay occurs.
Defaults to `True`.
:type block: bool
:param timeout: Number of seconds to wait in case of latency.
Defaults to `5`.
:type timeout: int
:return: Next unprocessed event in stream
:rtype: Event
"""
...
[docs]
class IStreamFactory(Protocol):
"""
Stream factory class.
One and only purpose of this class is to create a stream.
"""
__slots__ = ()
def __init__(self, stream_class: type[IStream], kwargs: dict[str, Any]) -> None: ...
def __call__(self) -> IStream:
"""
Create a stream
:return: Stream instance
:rtype: IStream
"""
...
[docs]
class IRunner(Protocol):
"""
Runner class.
One and only purpose of this class is to run given consumers.
:param stream: Event stream
:type stream: IStream
:param event: Event type
:type event: EventType
:param consumers: Consumers
:type consumers: Tuple[IConsumer, ...]
:param max_events: Max events to process
Defaults to -1.
If value is equal to -1,
then there is not limit for number of events to process
:type max_events: int
:param logger: Logger instance
:type logger: logging.Logger
"""
__slots__ = ()
def __init__(
self,
stream_factory: IStreamFactory,
event: EventType,
group: ConsumerGroup,
*consumers: IConsumer,
max_events: int = -1,
logger: logging.Logger = _logger,
) -> None: ...
[docs]
def run(self) -> None:
"""
Start runner
:return: None
:rtype: None
"""
...
[docs]
class IConsumer(Protocol):
"""
Consumer class.
One and only purpose of this class is to consume events.
"""
__slots__ = ()
def __init__(self, func: ConsumerFunc, logger: logging.Logger = _logger) -> None:
"""
Construct consumer instance
:param func: Consumer function
:type func: ConsumerFunc
:param logger: Logger instance
:type logger: logging.Logger
"""
...
[docs]
def consume(self, event: Event) -> None:
"""
Consume an event with consumer function
:param event: Event to consume
:type event: Event
:return: None
:rtype: None
"""
...