Source code for distributed_resource_optimization.algorithm.firstorder.consensus.economic_dispatch

"""Economic dispatch consensus actor.

A :class:`~.averaging.ConsensusActor` that computes a linearised inverted
quadratic cost response.  During each consensus iteration the actor updates its
local power output *P* to minimise cost given the current price signal λ, and
returns a gradient correction that pushes λ toward balancing supply and demand.

The gradient term is:

.. math::

    \\nabla_\\lambda = -\\rho \\left( P(\\lambda) - \\frac{P_{\\text{target}}}{N} \\right)

where

.. math::

    P(\\lambda) = \\text{clip}\\left(\\frac{\\lambda - c}{\\epsilon},\\; P_{\\min},\\; P_{\\max}\\right)


"""

from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any

import numpy as np

from .averaging import ConsensusActor


[docs] @dataclass class LinearCostEconomicDispatchConsensusActor(ConsensusActor): """Economic dispatch via linearised inverted quadratic cost function. :param cost: Marginal cost coefficient *c* in the cost function ``cP + εP²``. :param p_max: Maximum power output. :param rho: Gradient step size (consensus price sensitivity). :param epsilon: Sensitivity of power response to price (default 0.1). :param p_min: Minimum power output (default 0). :param n_guess: Estimated number of participants for target normalisation. """ cost: float p_max: float rho: float = 0.05 epsilon: float = 0.1 p_min: float = 0.0 n_guess: int = 10 # Updated each iteration P: np.ndarray = field(default_factory=lambda: np.array([0.0]))
[docs] def gradient_term( self, lam: np.ndarray, p_target: Any, ) -> np.ndarray: """Compute the gradient correction for the current price signal *lam*. :param lam: Current price/λ vector. :param p_target: Total target power (scalar or array); normalised by :attr:`n_guess` to get the per-participant share. :returns: Additive gradient correction (same shape as *lam*). """ self.P = np.clip( (lam - self.cost) / self.epsilon, self.p_min, self.p_max, ) p_target_arr = np.asarray(p_target if p_target is not None else 0.0) return -self.rho * (self.P - p_target_arr / self.n_guess)