Source code for eventscore.core.pipelines
from __future__ import annotations
import logging
from eventscore.core.abstract import (
ConsumerGroup,
EventType,
IConsumer,
IECore,
IProcessPipeline,
IRunner,
)
from eventscore.core.consumers import Consumer
from eventscore.core.exceptions import (
ClonesMismatchError,
EmptyPipelineError,
UnrelatedConsumersError,
)
from eventscore.core.logging import logger as _logger
from eventscore.core.runners import ObserverRunner
from eventscore.core.types import Pipeline, PipelineItem
from eventscore.core.workers import Worker
[docs]
class ProcessPipeline(IProcessPipeline):
def __init__(
self,
consumer_type: type[IConsumer] = Consumer,
runner_type: type[IRunner] = ObserverRunner,
logger: logging.Logger = _logger,
) -> None:
self.__consumer_type = consumer_type
self.__runner_type = runner_type
self.__logger = logger
def __call__(self, pipeline: Pipeline, ecore: IECore) -> Worker:
event, group, clones = self.__validate_pipeline(pipeline)
self.__logger.debug(
f"Received valid pipeline {pipeline}. Event: {event}. Clones: {clones}"
)
consumers = self.__make_consumers(pipeline.items)
self.__logger.debug(f"Built consumers: {consumers}")
runner = self.__make_runner(consumers, ecore, event, group)
self.__logger.debug(f"Built runner: {runner}")
return Worker(
uid=pipeline.uid,
name=str(pipeline.uid),
clones=clones,
runner=runner,
)
def __validate_pipeline(
self,
pipeline: Pipeline,
) -> tuple[EventType, ConsumerGroup, int]:
if len(pipeline.items) == 0:
raise EmptyPipelineError
clones_unique = set(item.clones for item in pipeline.items)
if len(clones_unique) > 1:
raise ClonesMismatchError
events_unique = set(item.event for item in pipeline.items)
if (len(events_unique)) > 1:
raise UnrelatedConsumersError
return (
events_unique.pop(),
next(iter(pipeline.items)).group,
clones_unique.pop(),
)
def __make_consumers(self, items: set[PipelineItem]) -> list[IConsumer]:
result: list[IConsumer] = []
for item in items:
result.append(self.__consumer_type(item.func, logger=self.__logger))
return result
def __make_runner(
self,
consumers: list[IConsumer],
ecore: IECore,
event: EventType,
group: ConsumerGroup,
) -> IRunner:
return self.__runner_type(
ecore.stream_factory,
event,
group,
*consumers,
logger=self.__logger,
)