How To: Implement a Custom Algorithm¶
The algorithm abstraction has a single required method. Implementing it lets your
algorithm participate in any carrier — SimpleCarrier, MangoCarrier, or one you
write yourself.
The Interface¶
All distributed algorithms extend
DistributedAlgorithm and implement
on_exchange_message():
from distributed_resource_optimization import DistributedAlgorithm
class MyAlgorithm(DistributedAlgorithm):
async def on_exchange_message(self, carrier, message_data, meta):
# Your logic here
...
Argument |
Type |
Description |
|---|---|---|
|
|
Use this to send replies or inspect neighbours |
|
Any |
The deserialized message payload |
|
|
Transport metadata — e.g. |
Step-by-Step Example — Echo Algorithm¶
As a minimal example: an algorithm that counts messages received and echoes each one back.
1. Define the algorithm class
from distributed_resource_optimization import (
DistributedAlgorithm,
OptimizationMessage,
)
class EchoMessage(OptimizationMessage):
def __init__(self, payload: str):
self.payload = payload
class EchoAlgorithm(DistributedAlgorithm):
def __init__(self):
self.count = 0
async def on_exchange_message(self, carrier, message_data, meta):
self.count += 1
print(f"Received #{self.count}: {message_data.payload}")
carrier.reply_to_other(EchoMessage(f"ACK: {message_data.payload}"), meta)
2. Run it with SimpleCarrier
import asyncio
from distributed_resource_optimization import (
ActorContainer, SimpleCarrier, cid,
)
async def main():
container = ActorContainer()
c1 = SimpleCarrier(container, EchoAlgorithm())
c2 = SimpleCarrier(container, EchoAlgorithm())
c1.send_to_other(EchoMessage("hello"), cid(c2))
await container.done_event.wait()
print(f"c2 received {c2.actor.count} message(s)")
asyncio.run(main())
Coordinated Algorithms¶
For algorithms that require a central coordinator (like ADMM), implement a
Coordinator alongside your algorithm:
from distributed_resource_optimization import Coordinator
class MyCoordinator(Coordinator):
async def start_optimization(self, carrier, message_data, meta):
participants = carrier.others("coordinator")
# ... send messages, collect replies, return results
return []
The coordinator is automatically given its own SimpleCarrier (or mango agent) and
becomes the entry point for
start_coordinated_optimization().
Practical Tips¶
Keep algorithm state in the class.
Since each SimpleCarrier dispatches messages via asyncio tasks, mutable attributes of
your algorithm class are effectively actor state — concurrent access within a single task
is naturally safe.
Use send_awaitable for request/reply patterns.
If your algorithm needs a response before proceeding:
future = carrier.send_awaitable(MyRequest(data), target_id)
response = await future
The ADMM coordinator uses this to collect all x-updates in parallel via
asyncio.gather(*futures).
Termination.
There is no built-in termination protocol. Implement convergence detection inside
on_exchange_message and simply stop sending messages when done. The done_event
on the ActorContainer fires automatically once all in-flight tasks have finished.
See Also¶
DistributedAlgorithm,CoordinatorCarrier,OptimizationMessage