import logging
import os
from collections import defaultdict
from typing import TYPE_CHECKING, Any, TypeAlias
import redis
if TYPE_CHECKING:
pass
from redis import Redis
from eventscore.core.abstract import ConsumerGroup, EventType, IEventSerializer, IStream
from eventscore.core.exceptions import EmptyStreamError, TooManyDataError
from eventscore.core.logging import logger as _logger
from eventscore.core.types import Event
XReadT: TypeAlias = list[tuple[bytes, list[tuple[bytes, dict[bytes, bytes]]]]]
[docs]
class RedisStream(IStream):
def __init__(
self,
*,
serializer: IEventSerializer[bytes, str],
redis: Redis | None = None,
host: str | None = None,
port: int | None = None,
db: int | None = None,
redis_init_kwargs: dict[str, Any] | None = None,
logger: logging.Logger = _logger,
) -> None:
"""
Construct Redis stream instance
:param host: Redis host
:type host: str
:param port: Redis port
:type port: int
:param db: Redis database
:type db: int
:param serializer: Event serializer
:type serializer: IEventSerializer[bytes, str]
:param redis_init_kwargs: Redis initialization kwargs
:type redis_init_kwargs: dict[str, Any] | None
"""
assert redis is not None or (
host is not None and port is not None and db is not None
), "Redis instance or required params for its constructing are required."
redis_init_kwargs = redis_init_kwargs or {}
redis_init_kwargs.update(
dict(
host=host,
port=port,
db=db,
)
)
self.__redis = redis or Redis(**redis_init_kwargs)
self.__serializer = serializer
self.__event_n_group_to_xgroup: dict[tuple[EventType, ConsumerGroup], bool] = (
defaultdict(bool)
)
self.__logger = logger
self.__name = str(os.getpid())
[docs]
def put(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
_ = self.__redis.xadd(
name=str(event.type),
fields={"value": self.__serializer.encode(event)},
)
self.__logger.debug(f"XADDed event {event}.")
[docs]
def pop(
self,
event: EventType,
group: ConsumerGroup,
*,
block: bool = True,
timeout: int = 5,
) -> Event:
self.__logger.debug(f"About to xread an event. Stream {id(self)}")
self.__ensure_xgroup(event, group)
xresult: XReadT = self.__redis.xreadgroup( # type:ignore[assignment]
groupname=str(group),
consumername=self.__name,
streams={str(event): ">"},
count=1,
block=timeout * 1000 if block else None,
)
self.__logger.debug(f"XREADedGROUP {xresult}.")
if not xresult:
raise EmptyStreamError
item = xresult[0]
if not item:
raise EmptyStreamError
name, data = item
if not data:
raise EmptyStreamError
if len(data) > 1:
raise TooManyDataError
uid, payload = data[0]
_ = self.__redis.xack(str(event), str(group), uid)
bevent = payload[b"value"]
self.__logger.debug(
f"Received valid event {name.decode()} with id {uid.decode()}."
)
return self.__serializer.decode(bevent)
def __ensure_xgroup(self, event: EventType, group: ConsumerGroup) -> None:
if self.__event_n_group_to_xgroup[(event, group)]:
return
try:
_ = self.__redis.xgroup_create(
name=str(event),
groupname=str(group),
# TODO: make this configurable,
# otherwise restart will cause duplicate reads
id="0",
mkstream=True,
)
except redis.ResponseError:
self.__event_n_group_to_xgroup[(event, group)] = True
self.__logger.debug(f"XGROUP already created for {(event, group)}.")
else:
self.__event_n_group_to_xgroup[(event, group)] = True
self.__logger.debug(f"XGROUP created for {(event, group)}.")