Source code for distributed_resource_optimization.carrier.simple

"""In-process SimpleCarrier backed by asyncio tasks.

Provides two convenience entry-points for running distributed or coordinated
optimizations without any network stack.
"""

from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, Any
from uuid import UUID, uuid4

from .core import Carrier

if TYPE_CHECKING:
    from ..algorithm.core import Coordinator, DistributedAlgorithm


# ---------------------------------------------------------------------------
# ActorContainer
# ---------------------------------------------------------------------------


[docs] class ActorContainer: """Registry of :class:`SimpleCarrier` instances that share a lifecycle. The container tracks how many asyncio dispatch tasks are currently in flight via :attr:`active_tasks`. When that counter drops to zero the :attr:`done_event` is set, signalling that the distributed run has finished. """ def __init__(self) -> None: self.actors: list[SimpleCarrier] = [] self.active_tasks: int = 0 self.done_event: asyncio.Event = asyncio.Event() def _register(self, carrier: SimpleCarrier) -> int: self.actors.append(carrier) return len(self.actors) # 1-indexed aid
# --------------------------------------------------------------------------- # SimpleCarrier # ---------------------------------------------------------------------------
[docs] class SimpleCarrier(Carrier): """Lightweight in-process carrier for a single algorithm participant. Messages are dispatched as asyncio Tasks so that multiple participants can run concurrently on the same event loop. The carrier uses a 1-indexed addressing scheme (``aid`` 1 … N); ``cid(carrier)`` returns the integer aid. """ def __init__( self, container: ActorContainer, actor: "DistributedAlgorithm | Coordinator", ) -> None: self.container = container self.actor = actor self.aid: int = container._register(self) # Maps message-ID → async handler for request-response pairs self._uuid_to_handler: dict[UUID, Any] = {} # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _dispatch_to( self, target: "SimpleCarrier", content: Any, meta: dict, ) -> None: """Route an incoming message to the right handler.""" from ..algorithm.core import on_exchange_message msg_id = meta.get("message_id") if msg_id is not None and msg_id in target._uuid_to_handler: await target._uuid_to_handler[msg_id](target, content, meta) else: await on_exchange_message(target.actor, target, content, meta) def _task_done(self, task: asyncio.Task) -> None: """Callback invoked when a dispatch task finishes.""" self.container.active_tasks -= 1 if self.container.active_tasks == 0: self.container.done_event.set() # ------------------------------------------------------------------ # Carrier interface # ------------------------------------------------------------------
[docs] def send_to_other( self, content: Any, receiver: int, meta: dict | None = None, ) -> asyncio.Task: """Dispatch *content* to the carrier identified by 1-indexed *receiver*. The dispatch runs in a fresh asyncio Task so that the caller is not blocked. The container's :attr:`~ActorContainer.active_tasks` counter is incremented before the task starts and decremented (with possible done-event notification) when the task finishes. """ other = self.container.actors[receiver - 1] extra = meta or {} full_meta: dict = {"sender": self.aid, "message_id": uuid4()} full_meta.update(extra) self.container.active_tasks += 1 # Reset done_event if it was previously set if self.container.done_event.is_set(): self.container.done_event.clear() async def _run() -> None: try: await self._dispatch_to(other, content, full_meta) finally: self.container.active_tasks -= 1 if self.container.active_tasks == 0: self.container.done_event.set() task = asyncio.create_task(_run()) return task
[docs] def reply_to_other(self, content: Any, meta: dict) -> asyncio.Task: """Reply to the sender recorded in *meta*. The original ``message_id`` is preserved so that the coordinator's awaitable handler can match the response. """ sender = meta["sender"] reply_meta = {**meta, "reply": True} return self.send_to_other(content, sender, meta=reply_meta)
[docs] def send_awaitable( self, content: Any, receiver: int, meta: dict | None = None, ) -> asyncio.Future: """Send *content* and return a Future that resolves to the reply. The reply is matched by the ``message_id`` stored in the outgoing meta. The sender registers a one-shot handler keyed on that ID; when the target calls :meth:`reply_to_other` the same ID travels back and triggers the handler, resolving the future. """ other = self.container.actors[receiver - 1] extra = meta or {} msg_id = uuid4() full_meta: dict = {"sender": self.aid, "message_id": msg_id} full_meta.update(extra) loop = asyncio.get_event_loop() future: asyncio.Future = loop.create_future() async def _handler(carrier: SimpleCarrier, reply_content: Any, _meta: dict) -> None: if not future.done(): future.set_result(reply_content) self._uuid_to_handler[msg_id] = _handler async def _run() -> None: await self._dispatch_to(other, content, full_meta) asyncio.create_task(_run()) return future
[docs] def others(self, participant_id: str) -> list[int]: """Return all 1-indexed carrier IDs except *this* carrier's ID.""" return [i + 1 for i in range(len(self.container.actors)) if i + 1 != self.aid]
[docs] def get_address(self) -> int: return self.aid
[docs] def schedule_using(self, fn: Any, delay_s: float) -> asyncio.Task: """Schedule *fn* to run after *delay_s* seconds on the event loop.""" async def _run() -> None: if delay_s > 0: await asyncio.sleep(delay_s) fn() return asyncio.create_task(_run())
[docs] def cid(carrier: SimpleCarrier) -> int: """Return the 1-indexed ID of *carrier*.""" return carrier.aid
# --------------------------------------------------------------------------- # Express helpers # ---------------------------------------------------------------------------
[docs] async def start_distributed_optimization( actors: list["DistributedAlgorithm"], start_message: Any, ) -> None: """Run a fully distributed optimization (e.g. COHDA) and wait until done. Creates a fresh :class:`ActorContainer`, wraps each algorithm in a :class:`SimpleCarrier`, sends *start_message* from the first carrier to the second, then awaits completion. :param actors: List of algorithm participants. :param start_message: The initial message to kick-off the algorithm (e.g. a :class:`~...cohda.core.WorkingMemory`). """ container = ActorContainer() carriers = [SimpleCarrier(container, actor) for actor in actors] carriers[0].send_to_other(start_message, cid(carriers[1])) await container.done_event.wait()
[docs] async def start_coordinated_optimization( actors: list["DistributedAlgorithm"], coordinator: "Coordinator", start_message: Any, ) -> list[Any]: """Run a coordinator-driven optimization (e.g. ADMM) and return results. Creates a shared :class:`ActorContainer`, registers all actor carriers and a coordinator carrier, then delegates to :func:`~...algorithm.core.start_optimization`. :param actors: List of algorithm participants. :param coordinator: The coordinator (e.g. an :class:`~...admm.core.ADMMGenericCoordinator`). :param start_message: The start payload (e.g. :class:`~...admm.core.ADMMStart`). :returns: Whatever :func:`start_optimization` returns (coordinator-specific). """ from ..algorithm.core import start_optimization container = ActorContainer() _carriers = [SimpleCarrier(container, actor) for actor in actors] coordinator_carrier = SimpleCarrier(container, coordinator) return await start_optimization(coordinator, coordinator_carrier, start_message, {})