Source code for eventscore.core.consumers

import logging

from eventscore.core.abstract import ConsumerFunc, IConsumer
from eventscore.core.logging import logger as _logger
from eventscore.core.types import Event


[docs] class Consumer(IConsumer): def __init__(self, func: ConsumerFunc, logger: logging.Logger = _logger) -> None: self.__func = func self.__logger = logger
[docs] def consume(self, event: Event) -> None: self.__logger.debug("Consumer started.") self.__func(event) self.__logger.debug("Consumer finished.")