Source code for distributed_resource_optimization.algorithm.core

"""Abstract base types for distributed algorithms and coordinators."""

from __future__ import annotations

from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
    from ..carrier.core import Carrier


[docs] class OptimizationMessage(ABC): """Marker supertype for all optimization-protocol messages."""
[docs] class DistributedAlgorithm(ABC): """Base class for all distributed optimization algorithms. Concrete subclasses must implement :meth:`on_exchange_message`. """
[docs] @abstractmethod async def on_exchange_message( self, carrier: "Carrier", message_data: Any, meta: Any, ) -> Any: """Handle an incoming message from another participant. :param carrier: The carrier that delivered the message. :param message_data: The message payload. :param meta: Transport-level metadata (sender address, IDs, …). """
[docs] class Coordinator(ABC): """Base class for optimization coordinators."""
[docs] @abstractmethod async def start_optimization( self, carrier: "Carrier", message_data: Any, meta: Any, ) -> Any: """Initiate and run a complete coordinated optimization round. :param carrier: The carrier the coordinator uses to reach participants. :param message_data: Start payload (algorithm-specific). :param meta: Transport metadata from the triggering message. :returns: The final result (algorithm-specific). """
# --------------------------------------------------------------------------- # Module-level shim functions — preserved for compatibility with carrier code # ---------------------------------------------------------------------------
[docs] async def on_exchange_message( algorithm: DistributedAlgorithm, carrier: "Carrier", message_data: Any, meta: Any, ) -> Any: """Delegate to ``algorithm.on_exchange_message(carrier, message_data, meta)``.""" return await algorithm.on_exchange_message(carrier, message_data, meta)
[docs] async def start_optimization( coordinator: Coordinator, carrier: "Carrier", message_data: Any, meta: Any, ) -> Any: """Delegate to ``coordinator.start_optimization(carrier, message_data, meta)``.""" return await coordinator.start_optimization(carrier, message_data, meta)
# --------------------------------------------------------------------------- # CoordinatedDistributedAlgorithm # ---------------------------------------------------------------------------
[docs] class CoordinatedDistributedAlgorithm: """Bundle of a coordinator and its worker algorithms (informational only).""" def __init__( self, distributed_algo: list[DistributedAlgorithm], coordinator: Coordinator, ) -> None: self.distributed_algo = distributed_algo self.coordinator = coordinator