Source code for boss.configuration

import json

from .helper import file_from_url, log
from .factory import HEROFactory, DatasourceHEROFactory, PolledDatasourceHEROFactory


[docs] class Document(dict):
[docs] @classmethod def parse_string(cls, s: str): """ Parse a document in JSON format from a string. Args: s: string containing the JSON representation of the config """ try: obj = cls() obj.update(json.loads(s)) return obj except json.JSONDecodeError as e: log.error(f"Error while encoding json: {e}")
[docs] @classmethod def parse_url(cls, url: str): """ Parse a document in JSON format from a URL. Args: url: any URL supported by urllib (e.g. file://local.json or https://user:pass@couch.db/database/my_doc) """ f_handle = file_from_url(url) return cls.parse_string(f_handle.read())
[docs] class WorkerConfigurationDocument(Document):
[docs] def datasource_config(self): if "datasource" in self and isinstance(self["datasource"], dict): cfg = {"async": False, "interval": 5.0, "observables": {}} cfg.update(self["datasource"]) return cfg else: return None
[docs] def build_hero_for_boss(self, boss_object, realm="heros"): # replace special string for asyncio loop and multiprocess pool if "tags" in self: self["tags"].append(f"BOSS: {boss_object.name}") else: self["tags"] = [f"BOSS: {boss_object.name}"] for key, val in self["arguments"].items(): if val == "@_boss_loop": self["arguments"][key] = boss_object._loop if val == "@_boss_pool": self["arguments"][key] = boss_object._pool if (datasource_config := self.datasource_config()) is not None: if datasource_config["async"]: return DatasourceHEROFactory.build( self["classname"], self["arguments"], self["_id"], tags=self["tags"], observables=datasource_config["observables"], realm=realm, ) else: return PolledDatasourceHEROFactory.build( self["classname"], self["arguments"], self["_id"], tags=self["tags"], loop=boss_object._loop, interval=datasource_config["interval"], realm=realm, observables=datasource_config["observables"], ) else: return HEROFactory.build(self["classname"], self["arguments"], self["_id"], realm=realm, tags=self["tags"])