import importlib
import inspect
import logging
import os
import sys
import traceback
from collections import defaultdict
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from pathlib import Path
from typing import Any, TypeAlias
from eventscore.core import pkg
from eventscore.core.abstract import (
ConsumerFunc,
ConsumerGroup,
EventType,
FunctionModulePath,
IECore,
IProcessPipeline,
IProducer,
ISpawnWorker,
IStream,
IStreamFactory,
NumberOfClones,
)
from eventscore.core.exceptions import (
AlreadySpawnedError,
NotADirectoryError,
NotAPackageError,
PathError,
)
from eventscore.core.logging import logger as _logger
from eventscore.core.pipelines import Pipeline, PipelineItem, ProcessPipeline
from eventscore.core.producers import Producer
from eventscore.core.types import Event
from eventscore.core.workers import SpawnMPWorker, Worker
from eventscore.decorators import consumer as _consumer
FoundConsumerFunctions: TypeAlias = list[
tuple[
ConsumerFunc,
EventType,
ConsumerGroup,
NumberOfClones,
FunctionModulePath,
]
]
SKIPPED_CONTEXTS = {
"makemigrations",
"migrate",
}
def _skipping_predicate() -> bool:
if any(context in sys.argv for context in SKIPPED_CONTEXTS):
return True
return False
[docs]
class ECore(IECore):
def __init__(
self,
stream_factory: IStreamFactory,
*,
process_pipeline: IProcessPipeline | None = None,
process_pipeline_type: type[IProcessPipeline] = ProcessPipeline,
process_pipeline_init_kwargs: dict[str, Any] | None = None,
spawn_worker: ISpawnWorker | None = None,
spawn_worker_type: type[ISpawnWorker] = SpawnMPWorker,
spawn_worker_init_kwargs: dict[str, Any] | None = None,
producer: IProducer | None = None,
producer_type: type[IProducer] = Producer,
producer_init_kwargs: dict[str, Any] | None = None,
logger: logging.Logger = _logger,
skipping_predicate: Callable[[], bool] | None = _skipping_predicate,
) -> None:
"""
ECore constructor
:param stream_factory: Event stream factory
:type stream_factory: IStreamFactory
:param process_pipeline: Pipeline processor. Defaults to None
:type process_pipeline: IProcessPipeline | None
:param process_pipeline_type: Type of the pipeline processor.
Defaults to ProcessPipeline.
Param is ignored when process_pipeline is not None
:type process_pipeline_type: type[IProcessPipeline]
:param process_pipeline_init_kwargs: Initial kwargs for pipeline processor type.
Defaults to None.
Param is ignored when process_pipeline is not None
:type process_pipeline_init_kwargs: dict[str, Any] | None
:param spawn_worker: Worker spawner. Defaults to None
:type spawn_worker: ISpawnWorker | None
:param spawn_worker_type: Type of the worker spawner.
Defaults to SpawnMPWorker.
Param is ignored when spawn_worker is not None
:type spawn_worker_type: type[ISpawnWorker]
:param spawn_worker_init_kwargs: Initial kwargs for worker spawner type.
Defaults to None.
Param is ignored when spawn_worker is not None
:type spawn_worker_init_kwargs: dict[str, Any] | None
:param producer: Producer. Defaults to None
:type producer: IProducer | None
:param producer_type: Type of the producer.
Defaults to Producer.
Param is ignored when producer is not None
:type producer_type: type[IProducer]
:param producer_init_kwargs: Initial kwargs for producer type.
Defaults to None.
Param is ignored when producer is not None
:type producer_init_kwargs: dict[str, Any] | None
:param logger: Logger. Defaults to _logger
:type logger: logging.Logger | None
:param skipping_predicate: Predicate that ecore run in "dry" mode.
Must return `True` to enable "dry"-run mode.
As an example, default predicate returns True if python
process is running a Django migration command.
:type skipping_predicate: Callable[[], bool] | None
"""
self.__stream_factory = stream_factory
self.__stream: IStream | None = None
self.__process_pipeline = process_pipeline
self.__process_pipeline_type = process_pipeline_type
self.__process_pipeline_init_kwargs = process_pipeline_init_kwargs
self.__spawn_worker = spawn_worker
self.__spawn_worker_type = spawn_worker_type
self.__spawn_worker_init_kwargs = spawn_worker_init_kwargs
self.__producer = producer
self.__producer_type = producer_type
self.__producer_init_kwargs = producer_init_kwargs
self.__pipelines: dict[ConsumerGroup, Pipeline] = defaultdict(Pipeline)
self.__workers: tuple[Worker, ...] | None = None
self.__workers_spawned = False
self.__logger = logger
self.__skip = skipping_predicate() if skipping_predicate else False
if not self.__skip:
# NOTE: imho, this is an ugly way to check
# if the code is running as a result of
# consumer discovering imports. For example,
# in Django it leads to duplicate consumer spawning.
# TODO: need some cleaner way for it.
for line in traceback.format_stack():
line = line.strip()
if self.discover_consumers.__name__ in line:
self.__skip = True
self.__logger.debug(
"Skip mode set because of consumers discovering."
)
break
assert (
self.__process_pipeline is not None
or self.__process_pipeline_type is not None
), "Pipeline processor is required."
assert (
self.__spawn_worker is not None or self.__spawn_worker_type is not None
), "Worker spawner is required."
# fmt:off
# black and ruff conflict
assert (
self.__producer is not None or self.__producer_type is not None
), "Producer is required."
# fmt:on
@property
def process_pipeline(self) -> IProcessPipeline:
if self.__process_pipeline is None:
kwargs = self.__process_pipeline_init_kwargs or {}
kwargs.setdefault("logger", self.__logger)
self.__process_pipeline = self.__process_pipeline_type(**kwargs)
return self.__process_pipeline
@property
def spawn_worker(self) -> ISpawnWorker:
if self.__spawn_worker is None:
kwargs = self.__spawn_worker_init_kwargs or {}
kwargs.setdefault("logger", self.__logger)
self.__spawn_worker = self.__spawn_worker_type(**kwargs)
return self.__spawn_worker
@property
def producer(self) -> IProducer:
if self.__producer is None:
kwargs = self.__producer_init_kwargs or {}
kwargs.setdefault("ecore", self)
kwargs.setdefault("logger", self.__logger)
self.__producer = self.__producer_type(**kwargs)
return self.__producer
@property
def stream_factory(self) -> IStreamFactory:
return self.__stream_factory
@property
def stream(self) -> IStream:
if self.__stream is None:
self.__stream = self.stream_factory()
return self.__stream
[docs]
def consumer(
self,
func: ConsumerFunc | None = None,
*,
event: EventType,
group: ConsumerGroup,
clones: int = 1,
) -> ConsumerFunc:
return _consumer(func, ecore=self, event=event, group=group, clones=clones)
[docs]
def register_consumer(
self,
func: ConsumerFunc,
event: EventType,
group: ConsumerGroup,
*,
clones: int = 1,
func_path: str | None = None,
) -> None:
if self.__skip:
self.__logger.warning(
"Skipping consumer registration due to skipping predicate."
)
return
if self.__workers_spawned:
self.__logger.error("Consumer registration attempt after spawning.")
raise AlreadySpawnedError
# NOTE: consumer discovering returns unwrapped consumers,
# but in the process also imports wrapped consumers.
# While they are same in terms of consumer function,
# they are still different in terms of python objects.
# To avoid consumers duplication, we only add unwrapped versions
# of found consumers, i.e. taking original function without decorator.
# This is fine as far as consumer decorator only registers consumer
# and not doing some runtime extra logic.
func = getattr(func, "__wrapped__", func)
self.__pipelines[group].items.add(
PipelineItem(
func=func,
func_path=func_path
or ((inspect.getsourcefile(func) or "") + ":" + func.__name__),
event=event,
group=group,
clones=clones,
)
)
self.__logger.info(
"Consumer with "
+ f"func={func.__name__}, "
+ f"event={event}, "
+ f"group={group}, "
+ f"clones={clones} "
+ "is successfully registered."
)
[docs]
def discover_consumers(self, *, root: str = "") -> None:
if self.__skip:
self.__logger.warning(
"Skipping consumer discovering due to skipping predicate."
)
return
if self.__workers_spawned:
self.__logger.error("Consumer registration attempt after spawning.")
raise AlreadySpawnedError
abs_root = (Path(os.getcwd()) / root).resolve()
if not abs_root.exists():
raise PathError
if not abs_root.is_dir():
raise NotADirectoryError
if not pkg.is_python_package(abs_root):
raise NotAPackageError
# Try to determine the package name from root.
# We'll use its parent as the sys.path entry.
pkg_root = abs_root
# Example: /home/me/project/myapp/subpkg
# Would sys.path.append /home/me/project and import myapp.subpkg.module
# So, pkg="subpkg", sys_path_entry=pkg_root.parent
# Guess the package name as the directory name, fallback to "." if not a package
pkg_name = abs_root.name
sys_path_entry = str(abs_root.parent)
remove_sys_path = False
if sys_path_entry not in sys.path:
sys.path.append(sys_path_entry)
remove_sys_path = True
self.__logger.debug(
f"Consumer discovering started for root={abs_root}, "
+ f"pkg_name={pkg_name}, "
+ f"sys_path_entry={sys_path_entry}"
)
found: FoundConsumerFunctions = []
def is_consumer(obj: object) -> bool:
return (
inspect.isfunction(obj)
and getattr(obj, "__is_consumer__", False)
and hasattr(obj, "__consumer_event__")
and hasattr(obj, "__consumer_group__")
and hasattr(obj, "__consumer_clones__")
)
@contextmanager
def temporary_sys_path_value(value: str, *, skip: bool) -> Iterator[None]:
try:
yield
finally:
if not skip:
sys.path.remove(value)
with temporary_sys_path_value(sys_path_entry, skip=not remove_sys_path):
for file_path in pkg.walk_files(pkg_root):
if file_path.name == "__init__.py":
# Import package module: just pkg_name (e.g. 'myapp.subpkg')
modname = (
".".join(
[pkg_name]
+ list(file_path.parent.relative_to(pkg_root).parts)
)
if file_path.parent != pkg_root
else pkg_name
)
else:
modname = pkg.module_name_from_path(file_path, pkg_root, pkg_name)
self.__logger.debug(f"Discovering in module {modname} at {file_path}")
try:
module = importlib.import_module(modname)
except Exception as e:
self.__logger.warning(f"Skipping {modname} ({file_path}): {e!r}")
continue
for _, func in inspect.getmembers(module, is_consumer):
found.append(
(
func,
func.__consumer_event__,
func.__consumer_group__,
func.__consumer_clones__,
f"{file_path}:{func.__name__}",
)
)
self.__logger.info(f"Discovered consumer: {func} in {modname}")
for func, event, group, clones, func_path in found:
self.register_consumer(
func,
event,
group,
clones=clones,
func_path=func_path,
)
self.__logger.info(
f"Consumer discovering ended. Found {len(found)} consumer functions."
)
[docs]
def produce(
self,
event: Event,
*,
block: bool = True,
timeout: int = 5,
) -> None:
if self.__skip:
self.__logger.warning("Skipping event producing due to skipping predicate.")
return
if not self.__workers_spawned:
self.__logger.warning("There is no spawned consumers at the moment.")
self.producer.produce(event, block=block, timeout=timeout)
[docs]
def spawn_workers(self) -> None:
if self.__skip:
self.__logger.warning(
"Skipping workers spawning due to skipping predicate."
)
return
if self.__workers_spawned:
self.__logger.warning("Spawn workers attempt when workers already spawned.")
return
if not self.__pipelines:
self.__logger.warning("There is no registered consumers. Nothing to spawn.")
return
workers = self.__build_workers()
for worker in workers:
_ = self.spawn_worker(worker)
self.__workers_spawned = True
self.__logger.info("Workers successfully spawned.")
def __build_workers(self) -> tuple[Worker, ...]:
if not self.__workers:
self.__workers = tuple(
self.process_pipeline(pipeline, self)
for pipeline in self.__pipelines.values()
)
__newline = "\n"
self.__logger.info(
"Built workers:\n\n"
+ f"{__newline.join(repr(worker) for worker in self.__workers)}\n"
)
return self.__workers