Source code for eventscore.core.workers

import logging
import multiprocessing as mp
from typing import cast

from eventscore.core.abstract import ISpawnWorker
from eventscore.core.logging import logger as _logger
from eventscore.core.types import Worker


[docs] class SpawnMPWorker(ISpawnWorker): def __init__(self, logger: logging.Logger = _logger) -> None: """ Construct spawn worker instance :param logger: Logger instance :type logger: logging.Logger """ self.__logger = logger def __call__(self, worker: Worker) -> tuple[int, ...]: processes: list[mp.Process] = [] for _ in range(worker.clones): process = mp.Process(target=worker.runner.run, daemon=True) processes.append(process) for process in processes: process.start() self.__logger.debug(f"Process {process.pid} has started.") return tuple(cast(int, process.pid) for process in processes)