[docs]classObserverRunner(IRunner):def__init__(self,stream_factory:IStreamFactory,event:EventType,group:ConsumerGroup,*consumers:IConsumer,max_events:int=-1,logger:logging.Logger=_logger,)->None:self.__stream=stream_factory()self.__event=eventself.__group=groupself.__max_events=max_eventsself.__consumers=consumersself.__logger=loggerassertlen(consumers)>0,"No consumers provided to runner."assertmax_events==-1ormax_events>0,"Max events must be positive or -1."
[docs]defrun(self)->None:events_counter=0whileself.__max_events==-1orevents_counter<self.__max_events:try:event=self.__stream.pop(self.__event,self.__group,block=True)exceptEmptyStreamError:self.__logger.debug("Stream is empty, no consumers ran this iteration.")continueevents_counter+=1tasks=tuple(threading.Thread(target=consumer.consume,args=(event,))forconsumerinself.__consumers)fortaskintasks:task.start()self.__logger.debug(f"Consumer thread {task.ident} has started.")fortaskintasks:task.join()self.__logger.debug(f"Consumer thread {task.ident} has finished.")