Source code for eventscore.core.runners

import logging
import threading

from eventscore.core.abstract import (
    ConsumerGroup,
    EventType,
    IConsumer,
    IRunner,
    IStreamFactory,
)
from eventscore.core.exceptions import EmptyStreamError
from eventscore.core.logging import logger as _logger


[docs] class ObserverRunner(IRunner): def __init__( self, stream_factory: IStreamFactory, event: EventType, group: ConsumerGroup, *consumers: IConsumer, max_events: int = -1, logger: logging.Logger = _logger, ) -> None: self.__stream = stream_factory() self.__event = event self.__group = group self.__max_events = max_events self.__consumers = consumers self.__logger = logger assert len(consumers) > 0, "No consumers provided to runner." assert max_events == -1 or max_events > 0, "Max events must be positive or -1."
[docs] def run(self) -> None: events_counter = 0 while self.__max_events == -1 or events_counter < self.__max_events: try: event = self.__stream.pop(self.__event, self.__group, block=True) except EmptyStreamError: self.__logger.debug("Stream is empty, no consumers ran this iteration.") continue events_counter += 1 tasks = tuple( threading.Thread(target=consumer.consume, args=(event,)) for consumer in self.__consumers ) for task in tasks: task.start() self.__logger.debug(f"Consumer thread {task.ident} has started.") for task in tasks: task.join() self.__logger.debug(f"Consumer thread {task.ident} has finished.")