Source code for eventscore.core.consumers
import logging
from eventscore.core.abstract import ConsumerFunc, IConsumer
from eventscore.core.logging import logger as _logger
from eventscore.core.types import Event
[docs]
class Consumer(IConsumer):
def __init__(self, func: ConsumerFunc, logger: logging.Logger = _logger) -> None:
self.__func = func
self.__logger = logger
[docs]
def consume(self, event: Event) -> None:
self.__logger.debug("Consumer started.")
self.__func(event)
self.__logger.debug("Consumer finished.")