Source code for distributed_resource_optimization.algorithm.admm.consensus_admm

"""Consensus ADMM — all participants reach the same value summing to *target*.

The global actor implements the consensus variant where z and u are lists of
per-participant vectors (one entry per agent).

The z-update is:

.. math::

    \\delta = \\frac{\\text{target} - \\sum_i (x_i + u_i)}{N + \\alpha / \\rho}

    z_i \\leftarrow x_i + u_i + \\delta


"""

from __future__ import annotations

from dataclasses import dataclass

import numpy as np

from .core import ADMMGenericCoordinator, ADMMGlobalActor, ADMMStart


[docs] @dataclass class ADMMConsensusGlobalActor(ADMMGlobalActor): """Global actor for the consensus ADMM variant. :param alpha: Regularisation weight that penalises deviation from the consensus (default 100). """ alpha: int = 100
[docs] def z_update( self, input_data: np.ndarray, x: list[np.ndarray], u: list[np.ndarray], z: list[np.ndarray], rho: float, n: int, ) -> list[np.ndarray]: m = len(z[0]) S = np.zeros(m) for xi, ui in zip(x, u): S += xi + ui delta = (np.asarray(input_data, dtype=float) - S) / (n + self.alpha / rho) return [xi + ui + delta for xi, ui in zip(x, u)]
[docs] def u_update( self, x: list[np.ndarray], u: list[np.ndarray], z: list[np.ndarray], rho: float, n: int, ) -> list[np.ndarray]: return [ui + xi - zi for ui, xi, zi in zip(u, x, z)]
[docs] def init_z(self, n: int, m: int) -> list[np.ndarray]: return [np.ones(m) for _ in range(n)]
[docs] def init_u(self, n: int, m: int) -> list[np.ndarray]: return [np.zeros(m) for _ in range(n)]
[docs] def actor_correction( self, x: list[np.ndarray], z: list[np.ndarray], u: list[np.ndarray], i: int, ) -> np.ndarray: return -z[i] + u[i]
[docs] def primal_residual(self, x: list[np.ndarray], z: list[np.ndarray]) -> float: return float(max(np.linalg.norm(xi - zi) for xi, zi in zip(x, z)))
# --------------------------------------------------------------------------- # Factories # ---------------------------------------------------------------------------
[docs] def create_consensus_target_reach_admm_coordinator() -> ADMMGenericCoordinator: """Create an :class:`ADMMGenericCoordinator` for the consensus variant.""" return ADMMGenericCoordinator(global_actor=ADMMConsensusGlobalActor())
[docs] def create_admm_start_consensus(target: list | np.ndarray) -> ADMMStart: """Create an :class:`~.core.ADMMStart` for a consensus run. :param target: The target vector that the sum of all *x* values must reach. """ t = np.asarray(target, dtype=float) return ADMMStart(data=t, solution_length=len(t))