Source code for eventscore.decorators

import functools
from typing import Any

from eventscore.core.abstract import ConsumerFunc, ConsumerGroup, EventType, IECore


[docs] def consumer( func: ConsumerFunc | None = None, *, ecore: IECore, event: EventType, group: ConsumerGroup, clones: int = 1, ) -> ConsumerFunc: def decorator(func: ConsumerFunc) -> ConsumerFunc: ecore.register_consumer(func, event, group, clones=clones) setattr(func, "__is_consumer__", True) setattr(func, "__consumer_event__", event) setattr(func, "__consumer_group__", group) setattr(func, "__consumer_clones__", clones) @functools.wraps(func) def wrapper(*args: Any, **kwargs: Any) -> Any: return func(*args, **kwargs) return wrapper if func is None: return decorator # type:ignore[return-value] return decorator(func)