"""ADMMFlexActor — local ADMM participant for flexibility/resource allocation.
Solves a quadratic program (QP) at each ADMM iteration:
.. math::
\\min_x \\;\\frac{\\rho}{2}\\|x + v\\|^2 + S_i^\\top x
\\text{subject to} \\quad l \\le x \\le u, \\quad Cx \\le d
where *v* = ``-correction`` (the signal sent by the coordinator), *S_i* is
a per-sector priority/penalty vector, and the constraints represent box and
coupling feasibility.
"""
from __future__ import annotations
from typing import TYPE_CHECKING, Any
import cvxpy as cp
import numpy as np
from ..core import DistributedAlgorithm
from .core import ADMMAnswer, ADMMMessage
if TYPE_CHECKING:
from ...carrier.core import Carrier
[docs]
class ADMMFlexActor(DistributedAlgorithm):
"""Local ADMM actor that solves a box+coupling-constrained QP.
:param lb: Lower-bound vector.
:param u: Upper-bound vector.
:param C: Coupling constraint matrix (rows: constraints, cols: variables).
:param d: Coupling RHS vector.
:param S: Priority/penalty vector (negative values act as rewards).
"""
def __init__(
self,
lb: np.ndarray,
u: np.ndarray,
C: np.ndarray,
d: np.ndarray,
S: np.ndarray,
) -> None:
self.lb = lb
self.u = u
self.C = C
self.d = d
self.S = S
self.x: np.ndarray = np.array([])
[docs]
async def on_exchange_message(
self,
carrier: "Carrier",
message_data: ADMMMessage,
meta: Any,
) -> None:
try:
self.x = _local_update(self, message_data.v, message_data.rho)
except Exception:
# Fail-safe: a malformed local QP (NaN inputs, infeasible
# constraints, solver crash) must not detach the asyncio
# task — that would hang the coordinator's gather forever.
# Reply with the previous solution (zeros on first round)
# so the round terminates and the next message can recover.
if self.x.size != len(message_data.v):
self.x = np.zeros(len(message_data.v), dtype=float)
carrier.reply_to_other(ADMMAnswer(x=self.x), meta)
[docs]
def result(actor: ADMMFlexActor) -> np.ndarray:
"""Return the most recent local solution of *actor*."""
return actor.x
def _local_update(actor: ADMMFlexActor, v: np.ndarray, rho: float) -> np.ndarray:
m = len(v)
x_var = cp.Variable(m)
h = rho * np.asarray(v, dtype=float) + np.asarray(actor.S, dtype=float)
objective = cp.Minimize((rho / 2) * cp.sum_squares(x_var) + h @ x_var)
constraints = [
x_var >= actor.lb,
x_var <= actor.u,
]
# cvxpy rejects empty matrices in linear expressions, so skip the
# coupling constraint when the actor has no rows in C.
if actor.C.shape[0] > 0:
constraints.append(actor.C @ x_var <= actor.d)
prob = cp.Problem(objective, constraints)
prob.solve(solver=cp.OSQP, verbose=False)
if x_var.value is None:
raise RuntimeError(
f"ADMM local QP did not converge (status={prob.status}). "
"Check feasibility of box + coupling constraints."
)
return np.asarray(x_var.value, dtype=float)
def _create_C_and_d(tech_capacity: np.ndarray) -> tuple[np.ndarray, np.ndarray]:
m = len(tech_capacity)
n_rows = 1 + 2 * (m - 1)
C = np.zeros((n_rows, m))
d = np.zeros(n_rows)
for i in range(m):
C[0, i] = -1.0 if tech_capacity[i] < 0 else 1.0
d[0] = float(np.sum(np.abs(tech_capacity)))
for j in range(m - 1):
r1 = 1 + 2 * j
r2 = r1 + 1
tj = tech_capacity[j]
tm = tech_capacity[m - 1]
if tj == 0 or tm == 0:
C[r1, j] = 0.0
C[r1, m - 1] = 0.0
C[r2, j] = 0.0
C[r2, m - 1] = 0.0
else:
C[r1, j] = 1.0 / tj
C[r1, m - 1] = -1.0 / tm
C[r2, j] = -1.0 / tj
C[r2, m - 1] = 1.0 / tm
return C, d
[docs]
def create_admm_flex_actor_one_to_many(
in_capacity: float,
eta: list[float] | np.ndarray,
P: list[float] | np.ndarray | None = None,
) -> ADMMFlexActor:
"""Create an :class:`ADMMFlexActor` for a one-to-many resource scenario.
A single input of capacity *in_capacity* is split to ``len(eta)`` outputs
according to efficiency factors *η*. The box constraints reflect the
feasible range of each output, and the coupling constraints preserve the
one-to-many conversion ratio.
:param in_capacity: Input resource capacity (e.g. rated power in kW).
:param eta: Efficiency factors for each output (may be negative for
bidirectional devices).
:param P: Per-output priority penalties. Positive = penalised,
negative = rewarded. Default: zeros (neutral).
:returns: Configured :class:`ADMMFlexActor`.
"""
eta_arr = np.asarray(eta, dtype=float)
tech_cap = in_capacity * eta_arr
p_arr = np.zeros(len(eta_arr)) if P is None else np.asarray(P, dtype=float)
lb = np.minimum(np.zeros(len(tech_cap)), tech_cap)
u = np.maximum(tech_cap, np.zeros(len(tech_cap)))
C, d = _create_C_and_d(tech_cap)
return ADMMFlexActor(lb=lb, u=u, C=C, d=d, S=-p_arr)