Source code for boss.worker

from multiprocessing import Process, Pipe, cpu_count
import asyncio
import platform
import signal
from .helper import log
from .multiprocess import MultiprocessRPC


[docs] def get_max_workers() -> int: """ Get the maximum number of worker processes to use. For Windows systems, caps the worker count at 32. For other platforms, returns the total CPU count. Returns: The maximum number of worker processes to use """ if platform.system() == "Windows": return min([int(cpu_count()), 32]) else: return int(cpu_count())
[docs] def start_hero_in_worker(config, realm, boss_name, boss_loop): """ This launcher to start a new worker in it's own process """ parent_conn, child_conn = Pipe() p = Process(target=launch_worker, args=(config, realm, boss_name, child_conn)) p.start() return {"process": p, "worker": WorkerRemote(boss_loop, parent_conn)}
[docs] def launch_worker(config, realm, boss_name, pipe): """ This is the entry point in the worker. """ worker = Worker(config, realm, boss_name, pipe) def exit_gracefully(*args): log.info(f"Stopping worker for HERO {boss_name}") worker.stop() signal.signal(signal.SIGTERM, exit_gracefully) signal.signal(signal.SIGINT, exit_gracefully) # only register SIGHUP if it exists on this platform if hasattr(signal, "SIGHUP"): signal.signal(signal.SIGHUP, exit_gracefully) worker.run() log.info(f"Worker for HERO {config['_id']} finished")
[docs] class Worker(MultiprocessRPC): def __init__(self, config, realm, boss_name, pipe): log.info(f"Worker for HERO {config['_id']} starting") # generate asyncio loop to be used by the child HEROs self._loop = asyncio.new_event_loop() self._obj = config.build_hero_for_worker(boss_name, self, realm) MultiprocessRPC.__init__(self, self._loop, pipe)
[docs] def run(self): log.debug(f"starting worker main loop for {self._obj}") self._loop.run_forever()
[docs] def stop(self): if hasattr(self._obj, "_teardown") and callable(getattr(self._obj, "_teardown")): self._obj._teardown(self) self._obj._destroy_hero() del self._obj self._rpc_stop() self._loop.stop()
[docs] class WorkerRemote(MultiprocessRPC):
[docs] def stop(self): self._rpc("stop")