Source code for eventscore.ext.kafka.streams
from typing import Any, TypeAlias
from kafka import KafkaConsumer, KafkaProducer # type:ignore[import-untyped]
from kafka.errors import KafkaTimeoutError # type:ignore[import-untyped]
from kafka.producer.future import FutureRecordMetadata # type:ignore[import-untyped]
from eventscore.core.abstract import ConsumerGroup, EventType, IEventSerializer, IStream
from eventscore.core.exceptions import (
EmptyStreamError,
EventNotSentError,
TooManyDataError,
)
from eventscore.core.types import Event, EventDict
PollResult: TypeAlias = dict[str, list[EventDict]]
[docs]
class KafkaStream(IStream):
def __init__(
self,
serializer: IEventSerializer[EventDict, bytes],
) -> None:
self.__serializer = serializer
configs: dict[str, Any] = {}
self.__producer = KafkaProducer(**configs)
self.__consumer = KafkaConsumer(**configs)
self.__consumer_subscription: EventType | None = None
[docs]
def put(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
record: FutureRecordMetadata = ( # type:ignore
self.__producer.send( # pyright:ignore[reportUnknownMemberType]
topic=str(event),
value=self.__serializer.encode(event),
)
)
if not block:
return
try:
_ = record.get( # pyright:ignore[reportUnknownMemberType,reportUnknownVariableType] # noqa:E501
timeout
)
except KafkaTimeoutError as exc:
raise EventNotSentError from exc
[docs]
def pop(
self,
event: EventType,
group: ConsumerGroup,
*,
block: bool = True,
timeout: int = 5,
) -> Event:
self.__single_consumer_subscription_lock(event)
record: PollResult = ( # pyright:ignore[reportUnknownVariableType]
self.__consumer.poll( # pyright:ignore[reportUnknownMemberType]
timeout * 1000 if block else 0,
max_records=1,
update_offsets=True,
)
)
if event not in record or not record[str(event)]:
raise EmptyStreamError
if len(record[str(event)]) > 1:
raise TooManyDataError
data = record[str(event)][0]
return self.__serializer.decode(data)
def __single_consumer_subscription_lock(self, event: EventType) -> None:
if self.__consumer_subscription == event:
return
if self.__consumer_subscription is not None:
self.__consumer.unsubscribe()
self.__consumer_subscription = event
self.__consumer.subscribe( # pyright:ignore[reportUnknownMemberType]
topics=[str(event)]
)