Source code for distributed_resource_optimization.algorithm.heuristic.cohda.decider

"""LocalSearchDecider — continuous-valued COHDA local search.

This module provides a gradient-free local search strategy for participants
whose feasible set is a continuous corridor rather than a finite enumeration.
The decider samples random values in each corridor dimension, evaluates a
combined local+global performance, and narrows the corridor by pruning
undesirable regions.


"""

from __future__ import annotations

import random
from typing import TYPE_CHECKING, Callable

import numpy as np

from .core import (
    COHDAAlgorithmData,
    LocalDecider,
    ScheduleSelection,
    SolutionCandidate,
    SystemConfig,
    WorkingMemory,
    create_from_updated_sysconf,
)

if TYPE_CHECKING:
    pass


[docs] class LocalSearchDecider(LocalDecider): """Random local search within per-dimension corridors. For each schedule dimension *i*, ``find_new_value`` samples candidate values from ``corridors[i]``, evaluates a combined local+global performance, and iteratively prunes the search space toward the best region found. :param initial_schedule: Starting schedule vector. :param corridors: List of ``(low, high)`` bounds for each dimension. :param local_performance: ``(schedule) -> float`` local objective. :param convergence_force_factor: Weight of the global deviation term (pushes the agent toward the target). :param max_iterations: Pruning iterations per dimension. :param sample_size_per_value: Number of random samples drawn initially. :param distribution: Factory ``(low, high) -> Callable[[], float]`` that produces random values (default: uniform). """ def __init__( self, initial_schedule: np.ndarray, corridors: list[tuple[float, float]], local_performance: Callable[[np.ndarray], float], convergence_force_factor: float = 0.1, max_iterations: int = 10, sample_size_per_value: int = 10, distribution: Callable[[float, float], Callable[[], float]] | None = None, ) -> None: self._initial_schedule = np.asarray(initial_schedule, dtype=float) self.corridors = corridors self.local_performance = local_performance self.convergence_force_factor = convergence_force_factor self.max_iterations = max_iterations self.sample_size_per_value = sample_size_per_value self.distribution = ( distribution if distribution is not None else (lambda lo, hi: lambda: random.uniform(lo, hi)) )
[docs] def initial_schedule(self, memory: WorkingMemory) -> np.ndarray: return self._initial_schedule
# --------------------------------------------------------------------------- # Algorithmic functions # --------------------------------------------------------------------------- def _local_performance_with_global_share( decider: LocalSearchDecider, schedule: np.ndarray, new_value: float, current_value: float, delta_to_target: float, ) -> float: """Combined local + convergence-force performance.""" return decider.local_performance(schedule) + decider.convergence_force_factor * ( (new_value - current_value) + delta_to_target ) def _find_new_value( decider: LocalSearchDecider, current_index: int, current_best_schedule: np.ndarray, delta_to_target: float, ) -> float: """Find an improved value for dimension *current_index* via random search.""" lo, hi = decider.corridors[current_index] sampler = decider.distribution(lo, hi) possible = [sampler() for _ in range(decider.sample_size_per_value)] current_value = float(current_best_schedule[current_index]) perf_tuples: list[tuple[float, float]] = [] # (value, performance) new_value: float | None = None iteration = 1 while possible and iteration <= decider.max_iterations: idx = random.randrange(len(possible)) new_value = possible[idx] copy_bs = current_best_schedule.copy() copy_bs[current_index] = new_value perf = _local_performance_with_global_share( decider, copy_bs, new_value, current_value, delta_to_target ) perf_tuples.append((new_value, perf)) if len(perf_tuples) == 3: # Sort by value ascending; prune based on monotonicity perf_tuples.sort(key=lambda t: t[0]) v1, p1 = perf_tuples[0] v2, p2 = perf_tuples[1] v3, p3 = perf_tuples[2] if p1 > p2 > p3: possible = [v for v in possible if v < v2] elif p3 > p2 > p1: possible = [v for v in possible if v > v2] elif (p2 > p1 > p3) or (p3 > p1 > p2): lo2, hi2 = min(v2, v3), max(v2, v3) possible = [v for v in possible if lo2 < v < hi2] iteration += 1 return new_value if new_value is not None else current_value def _find_in_local_search_room( decider: LocalSearchDecider, current_best_schedule: np.ndarray, open_schedule: np.ndarray, ) -> np.ndarray: """Search each dimension independently for an improved value.""" new_solution = current_best_schedule.copy() for i in range(len(current_best_schedule)): new_solution[i] = _find_new_value( decider, i, current_best_schedule, float(open_schedule[i]) ) return new_solution
[docs] def decide( cohda_data: COHDAAlgorithmData, decider: LocalSearchDecider, sysconfig: SystemConfig, candidate: SolutionCandidate, ) -> tuple[SystemConfig, SolutionCandidate]: """LocalSearchDecider decide step. Searches for a better schedule in the local search corridor. The *open_schedule* (residual distance from the current candidate sum to the weighted target) guides the convergence-force term. :returns: Updated ``(system_config, candidate)`` pair. """ current_best_schedule = candidate.schedules[cohda_data.participant_id - 1].copy() target = cohda_data.memory.target_params open_schedule = target.schedule * target.weights - candidate.schedules.sum(axis=0) new_best_schedule = _find_in_local_search_room(decider, current_best_schedule, open_schedule) new_candidate = create_from_updated_sysconf( cohda_data.participant_id, sysconfig, new_best_schedule ) existing = sysconfig.schedule_choices.get(cohda_data.participant_id) if existing is None or not np.array_equal(current_best_schedule, existing.schedule): sysconfig.schedule_choices[cohda_data.participant_id] = ScheduleSelection( schedule=current_best_schedule, counter=cohda_data.counter + 1, ) cohda_data.counter += 1 return sysconfig, new_candidate