Source code for boss.starter
import argparse
import sys
import os
import asyncio
import platform
import uuid
import signal
from multiprocessing import cpu_count
from concurrent.futures import ProcessPoolExecutor
from .helper import log
from .factory import HEROFactory
from .boss import BOSS
from .configuration import WorkerConfigurationDocument
[docs]
def create_unique_instance_name() -> str:
"""
Creates a unique instance identifier consisting of the hostname and an UUID.
Returns:
A unique identifier.
"""
short_uuid = str(uuid.uuid4()).split("-")[0]
hostname = platform.node()
return f"{hostname}_{short_uuid}"
[docs]
def run(args=None):
parser = argparse.ArgumentParser()
parser.add_argument("--log", default="info", help="loglevel: spam < debug < info")
parser.add_argument("--expose", action="store_true", help="whether the BOSS object should expose itself as HERO")
parser.add_argument("--realm", default="heros", help="Realm under which the HEROs should be exposed")
parser.add_argument("--max-workers", default=int(cpu_count()), help="Max number of worker for the ProcessPool")
parser.add_argument("--no-autostart", action="store_true", help="turn off autostart")
parser.add_argument(
"--name",
default=create_unique_instance_name(),
help="name of the BOSS instance. This needs to be unique of the BOSS object is exposed",
)
parser.add_argument(
"-u", "--url", action="append", default=[], help="Path to configuration file or url of database"
)
parser.add_argument(
"-e",
"--env",
action="append",
default=["BOSS"],
help="name of the environment variable storing the configuration",
)
args = parser.parse_args(args)
log.setLevel(args.log)
if not (args.url or args.env):
parser.error("Either --url or --env have to be specified")
# generate asyncio loop and process pool executor
# both can be passed to the child HEROs
loop = asyncio.new_event_loop()
pool = ProcessPoolExecutor(max_workers=int(args.max_workers))
# create BOSS object
if args.expose:
boss = HEROFactory.build(
"boss.BOSS", {"name": args.name, "configs": [], "loop": loop, "pool": pool, "realm": args.realm}, args.name, realm=args.realm
)
else:
boss = BOSS(name=args.name, configs=[], loop=loop, pool=pool, realm=args.realm)
if len(args.url) > 0:
log.info("Reading device(s) from %s ", args.url)
for url in args.url:
boss.add_hero_source(WorkerConfigurationDocument.parse_url, url)
if len(args.env) > 0:
for var in args.env:
if var in os.environ:
boss.add_hero_source(WorkerConfigurationDocument.parse_string, os.environ[var])
boss.refresh_hero_sources(auto_start=not args.no_autostart)
# to set the loggers of the started objects we have to set them globally
log.setLevel(args.log, globally=True)
log.info("Starting BOSS")
def exit_gracefully(*args):
log.info("Stopping BOSS...")
loop.stop()
boss.stop_all()
if hasattr(boss, "_destroy_hero"):
boss._destroy_hero()
boss._session_manager.force_close()
log.info("Exited BOSS")
sys.exit()
signal.signal(signal.SIGTERM, exit_gracefully)
signal.signal(signal.SIGINT, exit_gracefully)
# only register SIGHUP if it exists on this platform
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, exit_gracefully)
# start asyncio mainloop
loop.run_forever()
if __name__ == "__main__":
run()