Source code for distributed_resource_optimization.carrier.core
"""Abstract carrier interface and EventWithValue helper."""
from __future__ import annotations
import asyncio
from abc import ABC, abstractmethod
from typing import Any
[docs]
class EventWithValue:
"""Pairs an asyncio.Event with the value it will carry once set."""
def __init__(self) -> None:
self.event: asyncio.Event = asyncio.Event()
self.value: Any = None
[docs]
async def wait(self) -> Any:
await self.event.wait()
return self.value
[docs]
class Carrier(ABC):
"""Abstract communication carrier used by distributed algorithms.
A concrete carrier handles the transport of messages between algorithm
participants. Two built-in implementations are provided:
* :class:`~distributed_resource_optimization.carrier.simple.SimpleCarrier`
— lightweight in-process carrier backed by asyncio tasks.
* :class:`~distributed_resource_optimization.carrier.mango.MangoCarrier`
— integrates with the *mango-agents* framework for networked deployments.
"""
[docs]
@abstractmethod
def send_to_other(self, content: Any, receiver: Any, meta: dict | None = None) -> asyncio.Task:
"""Send *content* to *receiver* (fire-and-forget, returns the task).
:param content: Arbitrary message payload.
:param receiver: Carrier-specific address of the target participant.
:param meta: Optional extra metadata merged with transport defaults.
:returns: The asyncio Task that performs the dispatch.
"""
[docs]
@abstractmethod
def reply_to_other(self, content: Any, meta: dict) -> asyncio.Task:
"""Reply to the sender identified in *meta*.
:param content: Reply payload.
:param meta: Metadata from the incoming message (contains sender info).
:returns: The asyncio Task that performs the dispatch.
"""
[docs]
@abstractmethod
def send_awaitable(
self, content: Any, receiver: Any, meta: dict | None = None
) -> asyncio.Future:
"""Send *content* to *receiver* and return a Future for the reply.
The future resolves to the first reply message received in response to
this particular send (matched via a unique message ID).
:param content: Arbitrary message payload.
:param receiver: Carrier-specific address of the target participant.
:param meta: Optional extra metadata.
:returns: A :class:`asyncio.Future` that yields the reply content.
"""
[docs]
@abstractmethod
def others(self, participant_id: str) -> list[Any]:
"""Return all participant addresses except *participant_id*.
:param participant_id: The string identifier of the calling participant.
:returns: List of addresses for every other participant.
"""
[docs]
@abstractmethod
def get_address(self) -> Any:
"""Return the address of this carrier's participant."""
# ------------------------------------------------------------------
# Time domain — wall-clock by default; a simulation carrier overrides
# these so timeouts and background drivers run on the *simulation*
# clock instead of racing ahead on real wall-clock time.
# ------------------------------------------------------------------
[docs]
def now(self) -> float:
"""Current time in this carrier's clock domain (seconds)."""
return asyncio.get_event_loop().time()
[docs]
def sleep(self, seconds: float) -> Any:
"""Return an awaitable that resolves after *seconds* in this
carrier's clock domain."""
return asyncio.sleep(seconds)
[docs]
def spawn(self, coroutine: Any) -> asyncio.Task:
"""Launch a long-running driver coroutine (e.g. a coordinator's
request/reply loop or the gossip cascade round).
The base implementation runs it as a plain event-loop task. A
simulation-backed carrier overrides this to schedule it on the
agent scheduler so the simulation clock tracks it (the scheduler
marks a driver parked on a reply/peer future as idle, so discrete
stepping advances it instead of deadlocking on it).
"""
return asyncio.ensure_future(coroutine)
[docs]
async def wait_for(self, awaitable: asyncio.Future | EventWithValue) -> Any:
"""Await *awaitable*, unwrapping an :class:`EventWithValue` if needed."""
if isinstance(awaitable, EventWithValue):
return await awaitable.wait()
return await awaitable