SimpleCarrier

SimpleCarrier is the built-in lightweight carrier. It runs all participants as asyncio tasks within a single process, with no network or serialisation overhead. It is the recommended choice for prototyping, testing, and single-machine simulations.

Core Types

ActorContainer

An ActorContainer holds all SimpleCarrier instances and lets them find each other by numeric ID. Create one container per simulation:

from distributed_resource_optimization import ActorContainer

container = ActorContainer()

SimpleCarrier

Wraps an algorithm (or coordinator) and registers it with a container:

from distributed_resource_optimization import (
    SimpleCarrier, create_cohda_participant,
)

c1 = SimpleCarrier(container, create_cohda_participant(1, [[0.0, 1, 2], [1, 2, 3]]))
c2 = SimpleCarrier(container, create_cohda_participant(2, [[0.0, 1, 2], [1, 2, 3]]))

Each carrier is automatically assigned a 1-indexed integer ID when it registers. Retrieve it with cid():

from distributed_resource_optimization import cid

print(cid(c1))  # 1
print(cid(c2))  # 2

Sending Messages

Use send_to_other() to dispatch a message to another carrier in the same container. The message is delivered asynchronously as an asyncio Task:

import asyncio
from distributed_resource_optimization import create_cohda_start_message

async def main():
    start = create_cohda_start_message([1.2, 2.0, 3.0])
    c1.send_to_other(start, cid(c2))
    await container.done_event.wait()   # block until all tasks finish

asyncio.run(main())

Express API

For quick experiments, skip creating the container and carriers yourself. start_distributed_optimization() wraps everything in a single call:

import asyncio
from distributed_resource_optimization import (
    create_cohda_participant,
    create_cohda_start_message,
    start_distributed_optimization,
)

async def main():
    actor1 = create_cohda_participant(1, [[0.0, 1, 2], [1, 2, 3]])
    actor2 = create_cohda_participant(2, [[0.0, 1, 2], [1, 2, 3]])
    start = create_cohda_start_message([1.2, 2.0, 3.0])
    await start_distributed_optimization([actor1, actor2], start)

asyncio.run(main())

For coordinated algorithms (e.g. ADMM), use start_coordinated_optimization():

import asyncio
from distributed_resource_optimization import (
    create_admm_flex_actor_one_to_many,
    create_sharing_target_distance_admm_coordinator,
    create_admm_sharing_data, create_admm_start,
    start_coordinated_optimization,
)

async def main():
    flex1 = create_admm_flex_actor_one_to_many(10.0, [0.1,  0.5, -1.0])
    flex2 = create_admm_flex_actor_one_to_many(15.0, [0.1,  0.5, -1.0])
    flex3 = create_admm_flex_actor_one_to_many(10.0, [-1.0, 0.0,  1.0])
    coordinator = create_sharing_target_distance_admm_coordinator()
    start = create_admm_start(create_admm_sharing_data([-4.0, 0.0, 6.0], [5, 1, 1]))
    await start_coordinated_optimization([flex1, flex2, flex3], coordinator, start)

asyncio.run(main())

Awaitable Messages

When a participant needs a response before continuing (e.g. the ADMM x-update), send_awaitable() returns an asyncio.Future that resolves to the reply:

future = c1.send_awaitable(my_request, cid(c2))
response = await future

The ADMM coordinator uses this internally to collect all x-updates in parallel.

Note

All message dispatches are asyncio Tasks. The done_event on the container is set when the active-task counter reaches zero, signalling that the distributed run has finished.

See Also

  • SimpleCarrier, ActorContainer, cid()

  • start_distributed_optimization(), start_coordinated_optimization()

  • MangoCarrier — mango-agents carrier for TCP networking