import asyncio
import uuid
from heros.helper import log
POLLING_INTERVAL = 0.01
[docs]
class Message:
fields = ["id", "type"]
def __init__(self, *args, **kwargs):
super().__init__()
if hasattr(super().__thisclass__, "fields"):
self.fields += super().__thisclass__.fields
kwargs["type"] = self.TYPE
if "id" not in kwargs:
kwargs["id"] = int(uuid.uuid4())
for field in self.fields:
value = kwargs[field] if field in kwargs else None
setattr(self, field, value)
[docs]
def _decode(self, raw_message):
if raw_message["type"] != self.TYPE:
raise ValueError
for field in self.fields:
setattr(self, field, raw_message[field])
[docs]
def encode(self):
return {key: getattr(self, key) for key in self.fields}
[docs]
@staticmethod
def parse(raw_message):
for cls in [MethodCallMessage, ResultMessage, ErrorMessage, AttributeGetMessage, AttributeSetMessage]:
try:
message = cls()
message._decode(raw_message)
return message
except ValueError:
pass
[docs]
class MethodCallMessage(Message):
TYPE = "c"
fields = ["method", "args", "kwargs"]
[docs]
class AttributeGetMessage(Message):
TYPE = "g"
fields = ["name"]
[docs]
class AttributeSetMessage(Message):
TYPE = "s"
fields = ["name", "value"]
[docs]
class ResultMessage(Message):
TYPE = "r"
fields = ["payload"]
[docs]
class ErrorMessage(Message):
TYPE = "e"
fields = ["error"]
[docs]
class MultiprocessRPC:
"""
Use the communication between two processes via a pipe to implement RPC.
In BOSS this is used to communicate between the BOSS object in the main process and the BOSSRemote
running in a separate process. This becomes necessary since zenoh/rust/pyO3 does not allow to run
in the main process and in dynamically generated processes.
"""
def __init__(self, loop, pipe):
self._loop = loop
self._pipe = pipe
self._pending_results = {}
self._shutdown = False
self._communicatation_task = self._loop.create_task(self._communicate())
[docs]
async def _communicate(self):
while not self._shutdown:
if self._pipe.poll():
try:
self._handle_message(self._pipe.recv())
except EOFError:
pass
await asyncio.sleep(POLLING_INTERVAL)
[docs]
def _send_message(self, message):
try:
self._pipe.send(message.encode())
except BrokenPipeError:
pass
[docs]
def _handle_message(self, message):
msg = Message.parse(message)
if isinstance(msg, MethodCallMessage):
try:
result = getattr(self, msg.method)(*msg.args, **msg.kwargs)
self._send_message(ResultMessage(id=msg.id, payload=result))
except Exception as e:
log.error(f"Error {e}")
self._send_message(ErrorMessage(id=msg.id, error=e))
if isinstance(msg, ResultMessage):
if msg.id in self._pending_results:
self._pending_results[msg.id](msg.payload)
del self._pending_results[msg.id]
if isinstance(msg, AttributeGetMessage):
if hasattr(self, msg.name):
self._send_message(ResultMessage(id=msg.id, payload=getattr(self, msg.name)))
if isinstance(msg, AttributeSetMessage):
setattr(self, msg.name, msg.value)
if isinstance(msg, ErrorMessage):
raise msg.error
[docs]
def _rpc(self, method_name, *args, _cb=None, **kwargs):
msg = MethodCallMessage(method=method_name, args=args, kwargs=kwargs)
if _cb is None:
def _cb(payload):
pass
self._pending_results.update({msg.id: _cb})
self._send_message(msg)
[docs]
def _get_attribute(self, name: str, local_name: str | None = None):
if local_name is None:
local_name = name
def _update_attribute(payload):
setattr(self, local_name, payload)
msg = AttributeGetMessage(name=name)
self._pending_results.update({msg.id: _update_attribute})
self._send_message(msg)
[docs]
def _set_attribute(self, name: str, value: object):
self._send_message(AttributeSetMessage(name=name, value=value))
[docs]
def _rpc_stop(self):
self._communicatation_task.cancel()
self._shutdown = True