Source code for eventscore.core.producers
import logging
from eventscore.core.abstract import IECore, IProducer
from eventscore.core.logging import logger as _logger
from eventscore.core.types import Event
[docs]
class Producer(IProducer):
def __init__(self, ecore: IECore, logger: logging.Logger = _logger) -> None:
"""
Construct producer instance
:param ecore: Event core instance
:type ecore: IECore
"""
self.__ecore = ecore
self.__logger = logger
[docs]
def produce(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
self.__logger.debug(
f"Producing event {event} with block={block}, timeout={timeout}."
)
self.__ecore.stream.put(event=event, block=block, timeout=timeout)
self.__logger.debug(f"Event {event.uid} produced.")