API Reference

Carrier

Abstract carrier interface and EventWithValue helper.

class distributed_resource_optimization.carrier.core.EventWithValue[source]

Bases: object

Pairs an asyncio.Event with the value it will carry once set.

async wait() Any[source]
class distributed_resource_optimization.carrier.core.Carrier[source]

Bases: ABC

Abstract communication carrier used by distributed algorithms.

A concrete carrier handles the transport of messages between algorithm participants. Two built-in implementations are provided:

  • SimpleCarrier — lightweight in-process carrier backed by asyncio tasks.

  • MangoCarrier — integrates with the mango-agents framework for networked deployments.

abstractmethod send_to_other(content: Any, receiver: Any, meta: dict | None = None) Task[source]

Send content to receiver (fire-and-forget, returns the task).

Parameters:
  • content – Arbitrary message payload.

  • receiver – Carrier-specific address of the target participant.

  • meta – Optional extra metadata merged with transport defaults.

Returns:

The asyncio Task that performs the dispatch.

abstractmethod reply_to_other(content: Any, meta: dict) Task[source]

Reply to the sender identified in meta.

Parameters:
  • content – Reply payload.

  • meta – Metadata from the incoming message (contains sender info).

Returns:

The asyncio Task that performs the dispatch.

abstractmethod send_awaitable(content: Any, receiver: Any, meta: dict | None = None) Future[source]

Send content to receiver and return a Future for the reply.

The future resolves to the first reply message received in response to this particular send (matched via a unique message ID).

Parameters:
  • content – Arbitrary message payload.

  • receiver – Carrier-specific address of the target participant.

  • meta – Optional extra metadata.

Returns:

A asyncio.Future that yields the reply content.

abstractmethod others(participant_id: str) list[Any][source]

Return all participant addresses except participant_id.

Parameters:

participant_id – The string identifier of the calling participant.

Returns:

List of addresses for every other participant.

abstractmethod get_address() Any[source]

Return the address of this carrier’s participant.

now() float[source]

Current time in this carrier’s clock domain (seconds).

sleep(seconds: float) Any[source]

Return an awaitable that resolves after seconds in this carrier’s clock domain.

spawn(coroutine: Any) Task[source]

Launch a long-running driver coroutine (e.g. a coordinator’s request/reply loop or the gossip cascade round).

The base implementation runs it as a plain event-loop task. A simulation-backed carrier overrides this to schedule it on the agent scheduler so the simulation clock tracks it (the scheduler marks a driver parked on a reply/peer future as idle, so discrete stepping advances it instead of deadlocking on it).

async wait_for(awaitable: Future | EventWithValue) Any[source]

Await awaitable, unwrapping an EventWithValue if needed.

In-process SimpleCarrier backed by asyncio tasks.

Provides two convenience entry-points for running distributed or coordinated optimizations without any network stack.

class distributed_resource_optimization.carrier.simple.ActorContainer[source]

Bases: object

Registry of SimpleCarrier instances that share a lifecycle.

The container tracks how many asyncio dispatch tasks are currently in flight via active_tasks. When that counter drops to zero the done_event is set, signalling that the distributed run has finished.

class distributed_resource_optimization.carrier.simple.SimpleCarrier(container: ActorContainer, actor: DistributedAlgorithm | Coordinator)[source]

Bases: Carrier

Lightweight in-process carrier for a single algorithm participant.

Messages are dispatched as asyncio Tasks so that multiple participants can run concurrently on the same event loop.

The carrier uses a 1-indexed addressing scheme (aid 1 … N); cid(carrier) returns the integer aid.

send_to_other(content: Any, receiver: int, meta: dict | None = None) Task[source]

Dispatch content to the carrier identified by 1-indexed receiver.

The dispatch runs in a fresh asyncio Task so that the caller is not blocked. The container’s active_tasks counter is incremented before the task starts and decremented (with possible done-event notification) when the task finishes.

reply_to_other(content: Any, meta: dict) Task[source]

Reply to the sender recorded in meta.

The original message_id is preserved so that the coordinator’s awaitable handler can match the response.

send_awaitable(content: Any, receiver: int, meta: dict | None = None) Future[source]

Send content and return a Future that resolves to the reply.

The reply is matched by the message_id stored in the outgoing meta. The sender registers a one-shot handler keyed on that ID; when the target calls reply_to_other() the same ID travels back and triggers the handler, resolving the future.

others(participant_id: str) list[int][source]

Return all 1-indexed carrier IDs except this carrier’s ID.

get_address() int[source]

Return the address of this carrier’s participant.

schedule_using(fn: Any, delay_s: float) Task[source]

Schedule fn to run after delay_s seconds on the event loop.

distributed_resource_optimization.carrier.simple.cid(carrier: SimpleCarrier) int[source]

Return the 1-indexed ID of carrier.

async distributed_resource_optimization.carrier.simple.start_distributed_optimization(actors: list['DistributedAlgorithm'], start_message: Any) None[source]

Run a fully distributed optimization (e.g. COHDA) and wait until done.

Creates a fresh ActorContainer, wraps each algorithm in a SimpleCarrier, sends start_message from the first carrier to the second, then awaits completion.

Parameters:
  • actors – List of algorithm participants.

  • start_message – The initial message to kick-off the algorithm (e.g. a WorkingMemory).

async distributed_resource_optimization.carrier.simple.start_coordinated_optimization(actors: list['DistributedAlgorithm'], coordinator: Coordinator, start_message: Any) list[Any][source]

Run a coordinator-driven optimization (e.g. ADMM) and return results.

Creates a shared ActorContainer, registers all actor carriers and a coordinator carrier, then delegates to start_optimization().

Parameters:
  • actors – List of algorithm participants.

  • coordinator – The coordinator (e.g. an ADMMGenericCoordinator).

  • start_message – The start payload (e.g. ADMMStart).

Returns:

Whatever start_optimization() returns (coordinator-specific).

COHDA

COHDA — Combinatorial Optimization Heuristic for Distributed Agents.

Each agent maintains a WorkingMemory consisting of:

  • A TargetParams describing the global target schedule and weights.

  • A SystemConfig — the agent’s view of every participant’s current schedule choice (with a monotonic counter for version control).

  • A SolutionCandidate — the best complete solution known so far.

Agents exchange WorkingMemory objects. Upon receipt each agent runs perceive → decide → act and forwards its updated memory to all neighbours.

References

Hinrichs et al. (2014) “COHDA: A Combinatorial Optimization Heuristic for Distributed Agents”.

class distributed_resource_optimization.algorithm.heuristic.cohda.core.ScheduleSelection(schedule: ndarray, counter: int)[source]

Bases: object

A participant’s chosen schedule together with its version counter.

schedule: ndarray
counter: int
class distributed_resource_optimization.algorithm.heuristic.cohda.core.SystemConfig(schedule_choices: dict[int, ~distributed_resource_optimization.algorithm.heuristic.cohda.core.ScheduleSelection]=<factory>)[source]

Bases: object

Each participant’s schedule choice, keyed by 1-indexed participant ID.

schedule_choices: dict[int, ScheduleSelection]
class distributed_resource_optimization.algorithm.heuristic.cohda.core.SolutionCandidate(participant_id: int, schedules: ndarray, perf: float | None, present: frozenset[int])[source]

Bases: object

The best complete solution known to a participant.

Parameters:
  • participant_id – ID of the agent that last updated this candidate.

  • schedules – 2-D array of shape (max_id, n_intervals) where row participant_id - 1 holds that participant’s schedule (1-indexed IDs → 0-indexed rows).

  • perf – Cached performance value (None if not yet evaluated).

  • present – Frozen set of participant IDs whose schedule is included.

participant_id: int
schedules: ndarray
perf: float | None
present: frozenset[int]
class distributed_resource_optimization.algorithm.heuristic.cohda.core.TargetParams(schedule: ndarray, weights: ndarray)[source]

Bases: object

Global target schedule and per-interval weights.

schedule: ndarray
weights: ndarray
class distributed_resource_optimization.algorithm.heuristic.cohda.core.WorkingMemory(target_params: TargetParams | None, system_config: SystemConfig, solution_candidate: SolutionCandidate | None, additional_parameters: dict[str, ~typing.Any]=<factory>)[source]

Bases: OptimizationMessage

State shared between COHDA participants.

Parameters:
  • target_params – Global optimisation target (set once, on first recv).

  • system_config – Current view of all participants’ schedule choices.

  • solution_candidate – Best complete solution known to this agent.

  • additional_parameters – Arbitrary extra payload (unused by core).

target_params: TargetParams | None
system_config: SystemConfig
solution_candidate: SolutionCandidate | None
additional_parameters: dict[str, Any]
distributed_resource_optimization.algorithm.heuristic.cohda.core.cohda_default_performance(cluster_schedule: ndarray, target_params: TargetParams) float[source]

Score a candidate by weighted distance from the target (higher = better).

Parameters:
  • cluster_schedule – Array of shape (n_participants, n_intervals).

  • target_params – Target schedule and weights.

Returns:

-sum(weights * abs(target - column_sums)).

class distributed_resource_optimization.algorithm.heuristic.cohda.core.LocalDecider[source]

Bases: object

Abstract strategy for selecting a local schedule in the decide step.

initial_schedule(memory: WorkingMemory) ndarray[source]
class distributed_resource_optimization.algorithm.heuristic.cohda.core.DefaultLocalDecider(schedule_provider: ~typing.Callable[[~distributed_resource_optimization.algorithm.heuristic.cohda.core.WorkingMemory], list], is_local_acceptable: ~typing.Callable[[~numpy.ndarray], bool] = <function DefaultLocalDecider.<lambda>>)[source]

Bases: LocalDecider

Enumerate all feasible schedules and pick the globally best one.

Parameters:
  • schedule_provider(WorkingMemory) -> list[array-like] — returns all feasible schedules for this participant.

  • is_local_acceptable – Predicate filtering individual schedules.

initial_schedule(memory: WorkingMemory) ndarray[source]
class distributed_resource_optimization.algorithm.heuristic.cohda.core.COHDAAlgorithmData(participant_id: int, decider: LocalDecider, performance_function: Callable = <function cohda_default_performance>)[source]

Bases: DistributedAlgorithm

Per-participant COHDA state machine.

Parameters:
  • participant_id – 1-indexed unique participant ID.

  • decider – Local schedule selection strategy.

  • performance_function – Scores a full solution matrix.

counter: int
memory: WorkingMemory
async on_exchange_message(carrier: Carrier, message_data: WorkingMemory, meta: Any) None[source]

Handle an incoming message from another participant.

Parameters:
  • carrier – The carrier that delivered the message.

  • message_data – The message payload.

  • meta – Transport-level metadata (sender address, IDs, …).

distributed_resource_optimization.algorithm.heuristic.cohda.core.merge_sysconfigs(sysconfig_i: SystemConfig, sysconfig_j: SystemConfig) SystemConfig[source]

Merge two system configs, keeping the higher-counter entry per agent.

Returns sysconfig_i unchanged if it already dominates sysconfig_j.

distributed_resource_optimization.algorithm.heuristic.cohda.core.merge_candidates(candidate_i: SolutionCandidate, candidate_j: SolutionCandidate | None, participant_id: int, perf_func: Callable, target_params: TargetParams | None) SolutionCandidate[source]

Merge two solution candidates using the COHDA dominance rules.

  1. If K_i K_j (proper subset) → use j (more complete).

  2. If K_i == K_j → compare performance; break ties by lower agent ID.

  3. If K_j has IDs not in K_i → build a merged candidate.

  4. Otherwise keep i.

distributed_resource_optimization.algorithm.heuristic.cohda.core.perceive(cohda_data: COHDAAlgorithmData, working_memories: list[WorkingMemory]) tuple[SystemConfig, SolutionCandidate][source]

Incorporate received working memories into local state.

Initialises the local schedule and candidate the first time they are needed, then merges each incoming memory.

Returns:

Updated (system_config, solution_candidate) pair.

distributed_resource_optimization.algorithm.heuristic.cohda.core.create_from_updated_sysconf(participant_id: int, sysconfig: SystemConfig, new_schedule: ndarray) SolutionCandidate[source]

Build a fresh SolutionCandidate from sysconfig + new_schedule.

distributed_resource_optimization.algorithm.heuristic.cohda.core.decide(cohda_data: COHDAAlgorithmData, decider: LocalDecider, sysconfig: SystemConfig, candidate: SolutionCandidate) tuple[SystemConfig, SolutionCandidate][source]

Dispatch to the right decide implementation for decider.

distributed_resource_optimization.algorithm.heuristic.cohda.core.act(cohda_data: COHDAAlgorithmData, new_sysconfig: SystemConfig, new_candidate: SolutionCandidate) WorkingMemory[source]

Commit new_sysconfig and new_candidate to memory; return memory.

async distributed_resource_optimization.algorithm.heuristic.cohda.core.process_exchange_message(algorithm_data: COHDAAlgorithmData, messages: list[WorkingMemory], carrier: Carrier) None[source]

Run the perceive → decide → act cycle and forward updates to neighbours.

Skipped entirely if nothing changed during perception.

distributed_resource_optimization.algorithm.heuristic.cohda.core.create_cohda_start_message(target_schedule: list[float] | ndarray, weights: list[float] | ndarray | None = None) WorkingMemory[source]

Create the initial WorkingMemory that kicks off a COHDA run.

Parameters:
  • target_schedule – Global target vector.

  • weights – Per-interval weights (default: ones).

distributed_resource_optimization.algorithm.heuristic.cohda.core.create_cohda_participant(participant_id: int, schedule_set: list | Callable, performance_function: Callable = <function cohda_default_performance>) COHDAAlgorithmData[source]

Create a COHDA participant with a DefaultLocalDecider.

Parameters:
  • participant_id – 1-indexed unique ID.

  • schedule_set – A list of feasible schedules or a callable (WorkingMemory) -> list[array-like].

  • performance_function – Scoring function (default performance).

distributed_resource_optimization.algorithm.heuristic.cohda.core.create_cohda_participant_with_decider(participant_id: int, decider: LocalDecider, performance_function: Callable = <function cohda_default_performance>) COHDAAlgorithmData[source]

Create a COHDA participant with an explicit local decider.

distributed_resource_optimization.algorithm.heuristic.cohda.core.result(actor: COHDAAlgorithmData) ndarray[source]

Return the aggregate schedule (column-wise sum of all participants).

LocalSearchDecider — continuous-valued COHDA local search.

This module provides a gradient-free local search strategy for participants whose feasible set is a continuous corridor rather than a finite enumeration. The decider samples random values in each corridor dimension, evaluates a combined local+global performance, and narrows the corridor by pruning undesirable regions.

class distributed_resource_optimization.algorithm.heuristic.cohda.decider.LocalSearchDecider(initial_schedule: ndarray, corridors: list[tuple[float, float]], local_performance: Callable[[ndarray], float], convergence_force_factor: float = 0.1, max_iterations: int = 10, sample_size_per_value: int = 10, distribution: Callable[[float, float], Callable[[], float]] | None = None)[source]

Bases: LocalDecider

Random local search within per-dimension corridors.

For each schedule dimension i, find_new_value samples candidate values from corridors[i], evaluates a combined local+global performance, and iteratively prunes the search space toward the best region found.

Parameters:
  • initial_schedule – Starting schedule vector.

  • corridors – List of (low, high) bounds for each dimension.

  • local_performance(schedule) -> float local objective.

  • convergence_force_factor – Weight of the global deviation term (pushes the agent toward the target).

  • max_iterations – Pruning iterations per dimension.

  • sample_size_per_value – Number of random samples drawn initially.

  • distribution – Factory (low, high) -> Callable[[], float] that produces random values (default: uniform).

initial_schedule(memory: WorkingMemory) ndarray[source]
distributed_resource_optimization.algorithm.heuristic.cohda.decider.decide(cohda_data: COHDAAlgorithmData, decider: LocalSearchDecider, sysconfig: SystemConfig, candidate: SolutionCandidate) tuple[SystemConfig, SolutionCandidate][source]

LocalSearchDecider decide step.

Searches for a better schedule in the local search corridor. The open_schedule (residual distance from the current candidate sum to the weighted target) guides the convergence-force term.

Returns:

Updated (system_config, candidate) pair.

ADMM

ADMM core — generic coordinator and message types.

Provides the ADMMGenericCoordinator which drives the standard Alternating Direction Method of Multipliers iteration loop. Concrete global-actor implementations live in consensus_admm and sharing_admm; the local actor lives in flex_actor.

class distributed_resource_optimization.algorithm.admm.core.ADMMStart(data: Any, solution_length: int)[source]

Bases: object

Sent to the coordinator to begin a new ADMM run.

Parameters:
  • data – Algorithm-specific input (e.g. ADMMSharingData or a target vector).

  • solution_length – Number of decision variables per participant.

data: Any
solution_length: int
class distributed_resource_optimization.algorithm.admm.core.ADMMMessage(v: ndarray, rho: float)[source]

Bases: object

Sent by the coordinator to each participant to request an x-update.

Parameters:
  • v – Scaled consensus/sharing vector (the local QP reference point).

  • rho – ADMM penalty parameter.

v: ndarray
rho: float
class distributed_resource_optimization.algorithm.admm.core.ADMMAnswer(x: ndarray, aux: Any = None)[source]

Bases: object

Reply from a participant after solving its local update.

Parameters:
  • x – Local solution vector.

  • aux – Optional follower-side scalar/data (e.g. a per-step move magnitude) surfaced to the global actor’s convergence hook. None for variants that converge on the primal/dual residuals alone.

x: ndarray
aux: Any = None
class distributed_resource_optimization.algorithm.admm.core.ADMMGlobalActor[source]

Bases: ABC

Interface for the coordinator-side global update in ADMM variants.

abstractmethod z_update(input_data: Any, x: list[ndarray], u: Any, z: Any, rho: float, n: int) Any[source]

Compute the new global z from the current x and u.

abstractmethod u_update(x: list[ndarray], u: Any, z: Any, rho: float, n: int) Any[source]

Update the dual variable u.

abstractmethod init_z(n: int, m: int) Any[source]

Initialise z (called once before the iteration loop).

abstractmethod init_u(n: int, m: int) Any[source]

Initialise u (called once before the iteration loop).

abstractmethod actor_correction(x: list[ndarray], z: Any, u: Any, i: int) ndarray[source]

Compute the correction vector sent to participant i (0-indexed).

abstractmethod primal_residual(x: list[ndarray], z: Any) float[source]

Compute the primal residual used for convergence checking.

dual_residual(z: Any, z_old: Any, rho: float) float[source]

Dual residual for convergence. Default: rho * max||z - z_old||.

should_stop(primal_res: float, dual_res: float, aux: list[Any], abs_tol: float) bool | None[source]

Convergence override. Return True/False to decide directly, or None to fall back to the coordinator’s eps_pri/eps_dual test.

adapt_rho(primal_res: float, dual_res: float, rho: float, u: Any) tuple[float, Any][source]

Optionally rebalance rho (rescaling the scaled dual u inversely). Default: leave both unchanged.

class distributed_resource_optimization.algorithm.admm.core.ADMMGlobalObjective[source]

Bases: ABC

Optional global objective (currently informational only).

abstractmethod objective(x: list[ndarray], u: Any, z: Any, n: int) float[source]

Evaluate the global objective.

class distributed_resource_optimization.algorithm.admm.core.ADMMGenericCoordinator(global_actor: ADMMGlobalActor, rho: float = 1.0, max_iters: int = 1000, abs_tol: float = 0.0001, rel_tol: float = 0.001)[source]

Bases: Coordinator

Standard ADMM iteration loop.

Each round:

  1. Send ADMMMessage (correction + ρ) to all participants in parallel and await ADMMAnswer from each.

  2. Global z-update via z_update().

  3. Dual u-update via u_update().

  4. Check primal and dual residuals against tolerances; stop if met.

Parameters:
  • global_actor – Variant-specific global update logic.

  • rho – ADMM penalty parameter (default: 1.0).

  • max_iters – Maximum number of iterations (default: 1000).

  • abs_tol – Absolute convergence tolerance (default: 1e-4).

  • rel_tol – Relative convergence tolerance (default: 1e-3).

async start_optimization(carrier: Carrier, message_data: ADMMStart, meta: Any) list[np.ndarray][source]

Initiate and run a complete coordinated optimization round.

Parameters:
  • carrier – The carrier the coordinator uses to reach participants.

  • message_data – Start payload (algorithm-specific).

  • meta – Transport metadata from the triggering message.

Returns:

The final result (algorithm-specific).

distributed_resource_optimization.algorithm.admm.core.create_admm_start(data: Any, length: int | None = None) ADMMStart[source]

Create an ADMMStart message.

When length is omitted the length is inferred from data.solution_length or from len(data.target) (for sharing_admm.ADMMSharingData).

ADMMFlexActor — local ADMM participant for flexibility/resource allocation.

Solves a quadratic program (QP) at each ADMM iteration:

\[ \begin{align}\begin{aligned}\min_x \;\frac{\rho}{2}\|x + v\|^2 + S_i^\top x\\\text{subject to} \quad l \le x \le u, \quad Cx \le d\end{aligned}\end{align} \]

where v = -correction (the signal sent by the coordinator), S_i is a per-sector priority/penalty vector, and the constraints represent box and coupling feasibility.

class distributed_resource_optimization.algorithm.admm.flex_actor.ADMMFlexActor(lb: ndarray, u: ndarray, C: ndarray, d: ndarray, S: ndarray)[source]

Bases: DistributedAlgorithm

Local ADMM actor that solves a box+coupling-constrained QP.

Parameters:
  • lb – Lower-bound vector.

  • u – Upper-bound vector.

  • C – Coupling constraint matrix (rows: constraints, cols: variables).

  • d – Coupling RHS vector.

  • S – Priority/penalty vector (negative values act as rewards).

async on_exchange_message(carrier: Carrier, message_data: ADMMMessage, meta: Any) None[source]

Handle an incoming message from another participant.

Parameters:
  • carrier – The carrier that delivered the message.

  • message_data – The message payload.

  • meta – Transport-level metadata (sender address, IDs, …).

distributed_resource_optimization.algorithm.admm.flex_actor.result(actor: ADMMFlexActor) ndarray[source]

Return the most recent local solution of actor.

distributed_resource_optimization.algorithm.admm.flex_actor.create_admm_flex_actor_one_to_many(in_capacity: float, eta: list[float] | ndarray, P: list[float] | ndarray | None = None) ADMMFlexActor[source]

Create an ADMMFlexActor for a one-to-many resource scenario.

A single input of capacity in_capacity is split to len(eta) outputs according to efficiency factors η. The box constraints reflect the feasible range of each output, and the coupling constraints preserve the one-to-many conversion ratio.

Parameters:
  • in_capacity – Input resource capacity (e.g. rated power in kW).

  • eta – Efficiency factors for each output (may be negative for bidirectional devices).

  • P – Per-output priority penalties. Positive = penalised, negative = rewarded. Default: zeros (neutral).

Returns:

Configured ADMMFlexActor.

Consensus ADMM — all participants reach the same value summing to target.

The global actor implements the consensus variant where z and u are lists of per-participant vectors (one entry per agent).

The z-update is:

\[ \begin{align}\begin{aligned}\delta = \frac{\text{target} - \sum_i (x_i + u_i)}{N + \alpha / \rho}\\z_i \leftarrow x_i + u_i + \delta\end{aligned}\end{align} \]
class distributed_resource_optimization.algorithm.admm.consensus_admm.ADMMConsensusGlobalActor(alpha: int = 100)[source]

Bases: ADMMGlobalActor

Global actor for the consensus ADMM variant.

Parameters:

alpha – Regularisation weight that penalises deviation from the consensus (default 100).

alpha: int = 100
z_update(input_data: ndarray, x: list[ndarray], u: list[ndarray], z: list[ndarray], rho: float, n: int) list[ndarray][source]

Compute the new global z from the current x and u.

u_update(x: list[ndarray], u: list[ndarray], z: list[ndarray], rho: float, n: int) list[ndarray][source]

Update the dual variable u.

init_z(n: int, m: int) list[ndarray][source]

Initialise z (called once before the iteration loop).

init_u(n: int, m: int) list[ndarray][source]

Initialise u (called once before the iteration loop).

actor_correction(x: list[ndarray], z: list[ndarray], u: list[ndarray], i: int) ndarray[source]

Compute the correction vector sent to participant i (0-indexed).

primal_residual(x: list[ndarray], z: list[ndarray]) float[source]

Compute the primal residual used for convergence checking.

distributed_resource_optimization.algorithm.admm.consensus_admm.create_consensus_target_reach_admm_coordinator() ADMMGenericCoordinator[source]

Create an ADMMGenericCoordinator for the consensus variant.

distributed_resource_optimization.algorithm.admm.consensus_admm.create_admm_start_consensus(target: list | ndarray) ADMMStart[source]

Create an ADMMStart for a consensus run.

Parameters:

target – The target vector that the sum of all x values must reach.

Sharing ADMM — distributed resource sharing with target-distance objective.

Here z and u are global (single arrays shared across all participants) rather than per-participant lists.

The z-update minimises a weighted L1 distance to the target:

\[ \begin{align}\begin{aligned}\min_{z,d} \;\frac{N\rho}{2}\|z - \bar{x} - u\|^2 + \mathbf{1}^\top d\\\text{s.t.} \quad d_i \ge p_i(N z_i - t_i), \; d_i \ge -p_i(N z_i - t_i), \; d \ge 0\end{aligned}\end{align} \]

where \(\bar{x}\) is the participant average, p the priorities, and t the target vector.

class distributed_resource_optimization.algorithm.admm.sharing_admm.ADMMTargetDistanceObjective[source]

Bases: ADMMGlobalObjective

Quadratic target-distance objective (informational).

objective(x: list[ndarray], u: ndarray, z: ndarray, n: int) float[source]

Evaluate the global objective.

class distributed_resource_optimization.algorithm.admm.sharing_admm.ADMMSharingData(target: ndarray, priorities: ndarray)[source]

Bases: object

Input data for the sharing ADMM variant.

Parameters:
  • target – Desired sum vector (length m).

  • priorities – Per-element priority weights (negated so that positive input values become penalties).

target: ndarray
priorities: ndarray
distributed_resource_optimization.algorithm.admm.sharing_admm.create_admm_sharing_data(target: list | ndarray, priorities: list | ndarray | None = None) ADMMSharingData[source]

Build ADMMSharingData from user-friendly inputs.

Parameters:
  • target – Target sum vector.

  • priorities – Per-element priority weights (positive = higher priority for fulfilling that element). Default: all ones.

Returns:

ADMMSharingData with negated priorities (penalty form).

distributed_resource_optimization.algorithm.admm.sharing_admm.create_admm_start(data: ADMMSharingData) ADMMStart[source]

Wrap ADMMSharingData in an ADMMStart message.

class distributed_resource_optimization.algorithm.admm.sharing_admm.ADMMSharingGlobalActor(global_objective: ADMMGlobalObjective)[source]

Bases: ADMMGlobalActor

Global actor for the sharing ADMM variant.

Parameters:

global_objective – Global objective (currently unused in updates).

z_update(input_data: ADMMSharingData, x: list[ndarray], u: ndarray, z: ndarray, rho: float, n: int) ndarray[source]

Solve QP to find the optimal global z.

u_update(x: list[ndarray], u: ndarray, z: ndarray, rho: float, n: int) ndarray[source]

Update the dual variable u.

init_z(n: int, m: int) ndarray[source]

Initialise z (called once before the iteration loop).

init_u(n: int, m: int) ndarray[source]

Initialise u (called once before the iteration loop).

actor_correction(x: list[ndarray], z: ndarray, u: ndarray, i: int) ndarray[source]

Compute the correction vector sent to participant i (0-indexed).

primal_residual(x: list[ndarray], z: ndarray) float[source]

Compute the primal residual used for convergence checking.

distributed_resource_optimization.algorithm.admm.sharing_admm.create_sharing_target_distance_admm_coordinator() ADMMGenericCoordinator[source]

Create an ADMMGenericCoordinator for target-distance sharing.

distributed_resource_optimization.algorithm.admm.sharing_admm.create_sharing_admm_coordinator(objective: ADMMGlobalObjective) ADMMGenericCoordinator[source]

Create an ADMMGenericCoordinator with a custom objective.

Consensus

Averaging consensus algorithm.

Each participant maintains a local estimate λ and iteratively averages it with its neighbours’ estimates. An optional ConsensusActor can add a gradient term to bias the consensus toward a local optimum.

The update rule is:

\[\lambda^{k+1} = \lambda^k + \alpha (\bar{\lambda}^k - \lambda^k) + \nabla f(\lambda^k, \text{data})\]

where \(\bar{\lambda}^k\) is the average of all neighbours’ estimates at iteration k.

class distributed_resource_optimization.algorithm.firstorder.consensus.averaging.ConsensusActor[source]

Bases: object

Optional plug-in that adds a gradient term to the averaging update.

Subclass this to bias the consensus toward a local optimum (e.g. economic dispatch or price signals).

gradient_term(lam: ndarray, data: Any) ndarray | float[source]

Return the gradient correction for the current iterate lam.

Parameters:
  • lam – Current local estimate.

  • data – Auxiliary data forwarded from the start message.

Returns:

Additive correction (default: 0).

class distributed_resource_optimization.algorithm.firstorder.consensus.averaging.NoConsensusActor[source]

Bases: ConsensusActor

Neutral consensus actor — no gradient term (pure averaging).

class distributed_resource_optimization.algorithm.firstorder.consensus.averaging.AveragingConsensusMessage(lam: ndarray, k: int, data: Any, initial: bool = False)[source]

Bases: OptimizationMessage

Message exchanged between averaging-consensus participants.

Parameters:
  • lam – Current λ estimate of the sender.

  • k – Current iteration counter.

  • data – Auxiliary payload forwarded to ConsensusActor.gradient_term().

  • initial – If True this is the kick-off message; recipients (re-)initialise their state.

lam: ndarray
k: int
data: Any
initial: bool = False
class distributed_resource_optimization.algorithm.firstorder.consensus.averaging.ConsensusFinishedMessage(lam: ndarray, k: int, actor: ConsensusActor)[source]

Bases: object

Emitted (internally) when a participant finishes the consensus run.

Parameters:
  • lam – Final λ estimate.

  • k – Iteration at which convergence / max_iter was reached.

  • actor – The ConsensusActor instance of this participant.

lam: ndarray
k: int
actor: ConsensusActor
class distributed_resource_optimization.algorithm.firstorder.consensus.averaging.AveragingConsensusAlgorithm(finish_callback: Callable, consensus_actor: ConsensusActor | None = None, initial_lam: float = 10.0, alpha: float = 0.3, max_iter: int = 50)[source]

Bases: DistributedAlgorithm

Distributed averaging consensus with an optional gradient correction.

Parameters:
  • finish_callback – Called with (algorithm, carrier) when the run ends (either max_iter reached or all neighbours signal convergence).

  • consensus_actor – Optional ConsensusActor for gradient terms.

  • initial_lam – Starting scalar (broadcast to all λ dimensions).

  • alpha – Averaging step size (0 < α ≤ 1).

  • max_iter – Maximum number of consensus iterations.

actor: ConsensusActor
async on_exchange_message(carrier: Carrier, message_data: AveragingConsensusMessage, meta: Any) None[source]

Process one incoming averaging consensus message.

distributed_resource_optimization.algorithm.firstorder.consensus.averaging.create_averaging_consensus_participant(finish_callback: Callable, consensus_actor: ConsensusActor | None = None, initial_lam: float = 10.0, alpha: float = 0.3, max_iter: int = 50) AveragingConsensusAlgorithm[source]

Create an AveragingConsensusAlgorithm participant.

Parameters:
  • finish_callback(algorithm, carrier) -> None — called when done.

  • consensus_actor – Optional gradient actor. None → pure averaging.

  • initial_lam – Initial λ scalar.

  • alpha – Step size.

  • max_iter – Maximum iterations.

distributed_resource_optimization.algorithm.firstorder.consensus.averaging.create_averaging_consensus_start(initial_lam: float, data: Any = None) AveragingConsensusMessage[source]

Create the initial kick-off message for an averaging consensus run.

Parameters:
  • initial_lam – Starting scalar broadcast to all λ dimensions.

  • data – Auxiliary payload forwarded to gradient actors.

Returns:

An AveragingConsensusMessage with initial=True.

Economic dispatch consensus actor.

A ConsensusActor that computes a linearised inverted quadratic cost response. During each consensus iteration the actor updates its local power output P to minimise cost given the current price signal λ, and returns a gradient correction that pushes λ toward balancing supply and demand.

The gradient term is:

\[\nabla_\lambda = -\rho \left( P(\lambda) - \frac{P_{\text{target}}}{N} \right)\]

where

\[P(\lambda) = \text{clip}\left(\frac{\lambda - c}{\epsilon},\; P_{\min},\; P_{\max}\right)\]
class distributed_resource_optimization.algorithm.firstorder.consensus.economic_dispatch.LinearCostEconomicDispatchConsensusActor(cost: float, p_max: float, rho: float = 0.05, epsilon: float = 0.1, p_min: float = 0.0, n_guess: int = 10, P: ndarray = <factory>)[source]

Bases: ConsensusActor

Economic dispatch via linearised inverted quadratic cost function.

Parameters:
  • cost – Marginal cost coefficient c in the cost function cP + εP².

  • p_max – Maximum power output.

  • rho – Gradient step size (consensus price sensitivity).

  • epsilon – Sensitivity of power response to price (default 0.1).

  • p_min – Minimum power output (default 0).

  • n_guess – Estimated number of participants for target normalisation.

cost: float
p_max: float
rho: float = 0.05
epsilon: float = 0.1
p_min: float = 0.0
n_guess: int = 10
P: ndarray
gradient_term(lam: ndarray, p_target: Any) ndarray[source]

Compute the gradient correction for the current price signal lam.

Parameters:
  • lam – Current price/λ vector.

  • p_target – Total target power (scalar or array); normalised by n_guess to get the per-participant share.

Returns:

Additive gradient correction (same shape as lam).

Algorithm Core

Abstract base types for distributed algorithms and coordinators.

class distributed_resource_optimization.algorithm.core.OptimizationMessage[source]

Bases: ABC

Marker supertype for all optimization-protocol messages.

class distributed_resource_optimization.algorithm.core.DistributedAlgorithm[source]

Bases: ABC

Base class for all distributed optimization algorithms.

Concrete subclasses must implement on_exchange_message().

abstractmethod async on_exchange_message(carrier: Carrier, message_data: Any, meta: Any) Any[source]

Handle an incoming message from another participant.

Parameters:
  • carrier – The carrier that delivered the message.

  • message_data – The message payload.

  • meta – Transport-level metadata (sender address, IDs, …).

class distributed_resource_optimization.algorithm.core.Coordinator[source]

Bases: ABC

Base class for optimization coordinators.

abstractmethod async start_optimization(carrier: Carrier, message_data: Any, meta: Any) Any[source]

Initiate and run a complete coordinated optimization round.

Parameters:
  • carrier – The carrier the coordinator uses to reach participants.

  • message_data – Start payload (algorithm-specific).

  • meta – Transport metadata from the triggering message.

Returns:

The final result (algorithm-specific).

async distributed_resource_optimization.algorithm.core.on_exchange_message(algorithm: DistributedAlgorithm, carrier: Carrier, message_data: Any, meta: Any) Any[source]

Delegate to algorithm.on_exchange_message(carrier, message_data, meta).

async distributed_resource_optimization.algorithm.core.start_optimization(coordinator: Coordinator, carrier: Carrier, message_data: Any, meta: Any) Any[source]

Delegate to coordinator.start_optimization(carrier, message_data, meta).

class distributed_resource_optimization.algorithm.core.CoordinatedDistributedAlgorithm(distributed_algo: list[DistributedAlgorithm], coordinator: Coordinator)[source]

Bases: object

Bundle of a coordinator and its worker algorithms (informational only).