Source code for boss.boss

from .helper import log
from .configuration import WorkerConfigurationDocument
import asyncio
from collections.abc import Callable
from concurrent.futures import ProcessPoolExecutor


[docs] class BOSS: hero_config_sources: list def __init__( self, name: str, configs: list, loop: asyncio.AbstractEventLoop, pool: ProcessPoolExecutor, realm:str, expose: bool = True ): """ A BOSS object. Args: name: name of the Boss object loop: asyncio event loop that can be used to schedule tasks configs: List of WorkerConfigurationDocuments to specify the workers realm: Name of the realm the BOSS starts their HEROs in. expose: Specify whether the BOSS object itself should be exposed as HERO """ self.name = name self.heros: dict = {} self.hero_config_sources: list = [] self._loop = loop self._pool = pool self.realm = realm self.start_all()
[docs] def _config_from_name(self, name): if name not in self.heros: raise NameError(f"HERO with name {name} not run by this BOSS. Available HEROs are {self.heros.keys()}") return self.heros[name]["config"]
[docs] def add_hero_source(self, parser: Callable[[str], WorkerConfigurationDocument], target: str): """ Adds a data source to the BOSS from which HERO configurations are loaded. Args: parser: A function which takes :code:`target` as an argument and returns a :py:class:`boss.configuration.WorkerConfigurationDocument` dict with the HERO config information. target: Target for the :code:`parser`, for example an URL for :py:meth:`boss.configuration.WorkerConfigurationDocument.parse_url` """ self.hero_config_sources.append([parser, target])
[docs] def refresh_hero_sources(self, auto_start: bool = True): """ Refresh the HERO configuration data from the registered sources, updating existing HEROs, adding new ones, and removing those that are no longer present in the sources. HEROs that were running before are only restarted if their configuration changed. HEROs without configuration changes stay untouched. Args: auto_start: If True, automatically starts a *new* hero source if its configuration is loaded. Defaults to True. Does not influence the behavior of HEROs that are already registered. """ current_heros = list(self.heros.keys()) def handle_hero_dict(config: dict): if config is not None: if "rows" in config: for conf_row in config["rows"]: if "doc" in conf_row: handle_hero_dict(conf_row["doc"]) else: handle_hero_dict(conf_row) elif "_id" in config: name = config["_id"] do_start = False if name in self.heros: if self.heros[name]["config"] != config: # config changed, we need to restart if self.heros[name]["status"] == "running": do_start = True # was running before, auto start with the reloaded config self.stop_hero(name) self.heros[name]["config"] = WorkerConfigurationDocument(config) current_heros.remove(name) else: do_start = auto_start self.add_hero(config, auto_start=do_start) for parser, target in self.hero_config_sources: log.info("refreshing HERO source %s", target) config = parser(target) handle_hero_dict(config) for deleted_hero in current_heros: # hero not found in reloaded sources, delete log.info("Removing HERO %s from BOSS", deleted_hero) self.remove_hero(deleted_hero)
[docs] def add_hero(self, config: WorkerConfigurationDocument | dict, auto_start: bool = True): """ Start a new HERO and keep it running. Note that the id of the HERO specified in the config must be unique. Args: config: configuration for the new HERO. If a dict is given, it is converted into a WorkerConfigurationDocument. auto_start: If true the new HERO is immediately started after adding """ if not isinstance(config, dict): return None if not isinstance(config, WorkerConfigurationDocument): config = WorkerConfigurationDocument(config) name = config["_id"] if name not in self.heros: self.heros[name] = {"config": config, "object": None, "status": "stopped"} if auto_start: self.start_hero(name)
[docs] def start_hero(self, name): """ Start HERO with given name. """ config = self._config_from_name(name) status = self.heros[name]["status"] if status not in "running": try: self.heros[name]["object"] = config.build_hero_for_boss(self, self.realm) log.info(f"creating HERO with name {config['_id']} from class {config['classname']}") # calling setup hook if it exists if hasattr(self.heros[config["_id"]], "_setup") and callable( getattr(self.heros[name]["object"], "_setup") ): self.heros[name]["object"]._setup() self.heros[name]["status"] = "running" except KeyError as e: log.error(f"creating HERO with invalid dict: {config} failed: {e}") except Exception as e: log.error(f"creating HERO with name {config['_id']} from class {config['classname']} failed: {e}")
[docs] def stop_hero(self, name: str): config = self._config_from_name(name) status = self.heros[name]["status"] if status == "running": log.info(f"destroying HERO with name {config['_id']} from class {config['classname']}") obj = self.heros[name]["object"] # calling teardown hook if it exists if hasattr(obj, "_teardown") and callable(getattr(obj, "_teardown")): obj._teardown() obj._destroy_hero() del self.heros[name]["object"] self.heros[name]["object"] = None del obj self.heros[name]["status"] = "stopped"
[docs] def restart_hero(self, name: str): self.stop_hero(name) self.start_hero(name)
[docs] def remove_hero(self, name: str): self.stop_hero(name) if name in self.heros: del self.heros[name]
[docs] def start_all(self): for hero_name in self.heros.keys(): self.start_hero(hero_name)
[docs] def stop_all(self): for hero_name in self.heros.keys(): self.stop_hero(hero_name)
[docs] def status(self): return self.heros