Source code for utils.sweep_helpers

"""
Shared infrastructure for per-temperature Monte Carlo sweep workers.

This module provides the two-layer decomposition used by all three model
temperature sweeps (Ising, XY, Clock).

``simulate_at_temperature`` owns equilibration and raw measurement.
``compute_thermo_observables`` owns statistical summarization.
``simulate_thermo_point`` chains both layers and is the worker function passed
to ``parallel_sweep``.

Post-processing helpers ``build_uncertainty_bundle`` and ``build_quality_flags``
aggregate per-seed results over a full temperature axis and are also shared here,
eliminating identical copies that previously lived in each sweep script.
"""
from __future__ import annotations

import logging
from typing import Any, NamedTuple

import numpy as np

from utils.equilibration import convergence_equilibrate_with_status
from utils.statistics import (
    summarize_derived_observable,
    summarize_primary_observable,
    summarize_seed_ensemble,
)

# ---------------------------------------------------------------------------
# Input type
# ---------------------------------------------------------------------------

[docs] class ThermoPoint(NamedTuple): """Typed worker payload for one (temperature, seed) point in a sweep. Parameters ---------- temperature : float Simulation temperature T. size : int Linear lattice dimension L. meas_steps : int Number of Monte Carlo sweeps to record after equilibration. eq_probe_steps : int Chunk size passed to ``convergence_equilibrate_with_status``. eq_max_steps : int Hard cap on total equilibration steps. eq_qs_sigma_threshold : float Tail-std threshold for quasi-steady stuck-state detection. eq_qs_min_steps : int Minimum steps before stuck detection is allowed to fire. qs_allow_stuck : bool If True, a quasi-steady stuck state counts as convergence (Ising low-T). prefer_ordered_start : bool If True, switch to the ordered-start simulation when its magnetisation substantially exceeds the random-start value. Appropriate for Ising below T_c. temperature_index : int Index of this temperature in the sweep's temperature array. seed_index : int Replica index within the multi-seed ensemble for this temperature. seed : int RNG seed for both the random- and ordered-start simulations. model_cls : type Simulation class to instantiate (e.g. ``IsingSimulation``). Must be importable by its qualified name so it survives multiprocessing pickle. model_kwargs : dict[str, Any] Extra keyword arguments forwarded to the model constructor beyond ``size``, ``temp``, ``init_state``, and ``seed``. Empty dict for Ising and XY; carries ``{'q': q, 'A': aniso}`` for Clock. confidence : float Two-sided confidence level for uncertainty intervals (0 < c < 1). derived_method : str Uncertainty method for susceptibility and specific heat (``'blocking'`` or ``'bootstrap'``). bootstrap_resamples : int Bootstrap resamples used when ``derived_method='bootstrap'``. """ temperature: float size: int meas_steps: int eq_probe_steps: int eq_max_steps: int eq_qs_sigma_threshold: float eq_qs_min_steps: int qs_allow_stuck: bool prefer_ordered_start: bool temperature_index: int seed_index: int seed: int model_cls: type model_kwargs: dict[str, Any] confidence: float derived_method: str bootstrap_resamples: int
# --------------------------------------------------------------------------- # Intermediate result type # ---------------------------------------------------------------------------
[docs] class RawThermoData(NamedTuple): """Raw simulation output from one (temperature, seed) equilibration + measurement. Parameters ---------- temperature_index : int Index of this temperature in the sweep's temperature array. seed_index : int Replica index within the multi-seed ensemble for this temperature. equilibrated_flag : float 1.0 if equilibration converged, 0.0 otherwise. equilibration_steps : float Total equilibration steps taken (for diagnostics). temperature : float Temperature at which the simulation was run. size : int Linear lattice dimension L. mags_arr : np.ndarray or None Magnetisation time series of length ``meas_steps``. ``None`` when equilibration did not converge. engs_arr : np.ndarray or None Energy per site time series of length ``meas_steps``. ``None`` when equilibration did not converge. """ temperature_index: int seed_index: int equilibrated_flag: float equilibration_steps: float temperature: float size: int mags_arr: np.ndarray | None engs_arr: np.ndarray | None
# --------------------------------------------------------------------------- # Layer 1: simulation + equilibration # ---------------------------------------------------------------------------
[docs] def simulate_at_temperature(point: ThermoPoint) -> RawThermoData: """Run equilibration and measurement for one (temperature, seed) point. Creates a random-start and an ordered-start simulation with the same seed, runs two-start convergence equilibration, then records ``meas_steps`` sweeps on the active simulation. Returns raw magnetisation and energy time series for downstream statistical processing. Parameters ---------- point : ThermoPoint Fully configured worker payload. Returns ------- RawThermoData Raw time series; ``mags_arr`` and ``engs_arr`` are ``None`` when equilibration did not converge. """ T = point.temperature L = point.size sim_r = point.model_cls( size=L, temp=T, init_state='random', seed=point.seed, **point.model_kwargs, ) sim_o = point.model_cls( size=L, temp=T, init_state='ordered', seed=point.seed, **point.model_kwargs, ) total_steps, converged = convergence_equilibrate_with_status( sim_r, sim_o, chunk_size=point.eq_probe_steps, max_steps=point.eq_max_steps, qs_sigma_threshold=point.eq_qs_sigma_threshold, qs_min_steps=point.eq_qs_min_steps, qs_allow_stuck=point.qs_allow_stuck, ) if not converged: return RawThermoData( temperature_index=point.temperature_index, seed_index=point.seed_index, equilibrated_flag=0.0, equilibration_steps=float(total_steps), temperature=T, size=L, mags_arr=None, engs_arr=None, ) # For models where the ordered start is physically cleaner (Ising below T_c), # switch to it when its magnetisation is substantially higher. active_sim = sim_r if point.prefer_ordered_start: m_r = float(np.abs(sim_r._get_magnetization())) m_o = float(np.abs(sim_o._get_magnetization())) if m_o > m_r + 0.2: active_sim = sim_o mags, engs = active_sim.run(n_steps=point.meas_steps) return RawThermoData( temperature_index=point.temperature_index, seed_index=point.seed_index, equilibrated_flag=1.0, equilibration_steps=float(total_steps), temperature=T, size=L, mags_arr=np.asarray(mags, dtype=np.float64), engs_arr=np.asarray(engs, dtype=np.float64), )
# --------------------------------------------------------------------------- # Layer 2: statistical summarization # ---------------------------------------------------------------------------
[docs] def compute_thermo_observables( raw: RawThermoData, point: ThermoPoint, ) -> dict[str, float]: """Compute blocked/summarized thermodynamic observables from raw time series. Takes the raw output of ``simulate_at_temperature`` and applies autocorrelation-aware blocking (primary observables) and variance-based summarization (derived observables) to produce the standard per-point result dictionary. Parameters ---------- raw : RawThermoData Raw time series from ``simulate_at_temperature``. point : ThermoPoint Originating worker payload; provides temperature, size, and statistical configuration (confidence, derived_method, bootstrap_resamples). Returns ------- dict[str, float] Keys: ``temperature_index``, ``seed_index``, ``equilibrated_flag``, ``equilibration_steps``, and — when equilibrated — ``avg_m_*``, ``avg_e_*``, ``susc_*``, ``spec_h_*`` with suffixes ``_value``, ``_err``, ``_tau_int``, ``_n_eff``. """ base: dict[str, float] = { 'temperature_index': float(raw.temperature_index), 'seed_index': float(raw.seed_index), 'equilibrated_flag': raw.equilibrated_flag, 'equilibration_steps': raw.equilibration_steps, } if raw.equilibrated_flag == 0.0 or raw.mags_arr is None or raw.engs_arr is None: return base T = raw.temperature L = raw.size mags_arr = raw.mags_arr engs_arr = raw.engs_arr mag = summarize_primary_observable( time_series=mags_arr, confidence=point.confidence, ) eng = summarize_primary_observable( time_series=engs_arr, confidence=point.confidence, ) chi = summarize_derived_observable( magnetization_series=mags_arr, temperature=T, L=L, observable='chi', method=point.derived_method, confidence=point.confidence, bootstrap_resamples=point.bootstrap_resamples, ) cv = summarize_derived_observable( energy_series=engs_arr, temperature=T, L=L, observable='cv', method=point.derived_method, confidence=point.confidence, bootstrap_resamples=point.bootstrap_resamples, ) return { **base, 'avg_m_value': float(mag['value']), 'avg_m_err': float(mag['err']), 'avg_m_tau_int': float(mag['tau_int']), 'avg_m_n_eff': float(mag['n_eff']), 'avg_e_value': float(eng['value']), 'avg_e_err': float(eng['err']), 'avg_e_tau_int': float(eng['tau_int']), 'avg_e_n_eff': float(eng['n_eff']), 'susc_value': float(chi['value']), 'susc_err': float(chi['err']), 'susc_tau_int': float(chi['tau_int']), 'susc_n_eff': float(chi['n_eff']), 'spec_h_value': float(cv['value']), 'spec_h_err': float(cv['err']), 'spec_h_tau_int': float(cv['tau_int']), 'spec_h_n_eff': float(cv['n_eff']), }
# --------------------------------------------------------------------------- # Convenience wrapper (used by parallel_sweep workers) # ---------------------------------------------------------------------------
[docs] def simulate_thermo_point(point: ThermoPoint) -> dict[str, float]: """Run a full (equilibration + measurement + summarization) point. Thin wrapper over ``simulate_at_temperature`` followed by ``compute_thermo_observables``. This is the function passed as ``worker_func`` to ``parallel_sweep`` in temperature-sweep scripts. Parameters ---------- point : ThermoPoint Fully configured worker payload. Returns ------- dict[str, float] Standard per-point result dictionary; see ``compute_thermo_observables`` for the full key set. """ return compute_thermo_observables(simulate_at_temperature(point), point)
# --------------------------------------------------------------------------- # Post-processing helpers (shared by all temperature-sweep scripts) # ---------------------------------------------------------------------------
[docs] def build_uncertainty_bundle( *, values_by_seed: np.ndarray, errors_by_seed: np.ndarray, tau_by_seed: np.ndarray, n_eff_by_seed: np.ndarray, confidence: float, ) -> dict[str, np.ndarray]: """Aggregate per-seed results for one observable across a temperature axis. For multi-seed sweeps, applies hierarchical seed-ensemble aggregation via ``summarize_seed_ensemble``. Falls back to direct single-seed blocking results when only one seed is available. Parameters ---------- values_by_seed : np.ndarray Shape ``(n_temps, n_seeds)``. Per-seed point estimates. errors_by_seed : np.ndarray Shape ``(n_temps, n_seeds)``. Per-seed within-seed uncertainties. tau_by_seed : np.ndarray Shape ``(n_temps, n_seeds)``. Per-seed integrated autocorrelation times. n_eff_by_seed : np.ndarray Shape ``(n_temps, n_seeds)``. Per-seed effective sample sizes. confidence : float Two-sided confidence level for the returned CI bounds. Returns ------- dict[str, np.ndarray] Keys: ``'value'``, ``'err'``, ``'ci_low'``, ``'ci_high'``, ``'tau_int'``, ``'n_eff'``, ``'samples'``. All arrays have length ``n_temps`` except ``'samples'`` which has shape ``(n_temps, n_seeds)``. """ n_seeds = values_by_seed.shape[1] if n_seeds > 1: res_values = [] res_errors = [] res_low = [] res_high = [] for t in range(values_by_seed.shape[0]): mask = ~np.isnan(values_by_seed[t]) if not np.any(mask): res_values.append(np.nan) res_errors.append(np.nan) res_low.append(np.nan) res_high.append(np.nan) continue summary = summarize_seed_ensemble( values=values_by_seed[t, mask], within_seed_errors=errors_by_seed[t, mask], confidence=confidence, ) res_values.append(summary['value']) res_errors.append(summary['err']) res_low.append(summary['ci_low']) res_high.append(summary['ci_high']) res = { 'value': np.array(res_values), 'err': np.array(res_errors), 'ci_low': np.array(res_low), 'ci_high': np.array(res_high), } # Ensemble-averaged diagnostics; guard against all-NaN temperature rows. nan_mask = np.all(np.isnan(tau_by_seed), axis=1) tau_int = np.full(nan_mask.shape, np.nan) n_eff = np.full(nan_mask.shape, np.nan) if not np.all(nan_mask): with np.errstate(all='ignore'): tau_int[~nan_mask] = np.nanmean(tau_by_seed[~nan_mask], axis=1) n_eff[~nan_mask] = np.nanmean(n_eff_by_seed[~nan_mask], axis=1) if np.any(nan_mask): logger = logging.getLogger('vibespin') logger.warning( 'Autocorrelation diagnostics are undefined for %d temperature ' 'points (all seeds returned NaN). This is expected in the deep ' 'ordered/frozen phase.', int(np.sum(nan_mask)), ) else: res = { 'value': values_by_seed[:, 0], 'err': errors_by_seed[:, 0], 'ci_low': values_by_seed[:, 0] - errors_by_seed[:, 0], 'ci_high': values_by_seed[:, 0] + errors_by_seed[:, 0], } tau_int = tau_by_seed[:, 0] n_eff = n_eff_by_seed[:, 0] return { 'value': res['value'], 'err': res['err'], 'ci_low': res['ci_low'], 'ci_high': res['ci_high'], 'tau_int': tau_int, 'n_eff': n_eff, 'samples': values_by_seed.astype(np.float64), }
[docs] def build_quality_flags( *, tau_int: np.ndarray, ci_low: np.ndarray, ci_high: np.ndarray, n_eff: np.ndarray, min_effective_samples: float, max_tau_relative_width: float, ) -> dict[str, np.ndarray]: """Build per-temperature diagnostic flags for uncertainty quality. Parameters ---------- tau_int : np.ndarray Shape ``(n_temps,)``. Integrated autocorrelation time estimates. ci_low : np.ndarray Shape ``(n_temps,)``. Lower CI bound on ``tau_int``. ci_high : np.ndarray Shape ``(n_temps,)``. Upper CI bound on ``tau_int``. n_eff : np.ndarray Shape ``(n_temps,)``. Effective sample size estimates. min_effective_samples : float Threshold below which a point is flagged as low-N_eff. max_tau_relative_width : float CI-width-to-tau ratio above which the tau estimate is considered unstable. Returns ------- dict[str, np.ndarray] Keys: ``'undefined_autocorr_flag'``, ``'low_effective_sample_flag'``, ``'tau_interval_unstable_flag'``. All uint8 arrays of length ``n_temps``. """ undefined_autocorr = ~np.isfinite(tau_int) low_effective_sample = np.isfinite(n_eff) & (n_eff < min_effective_samples) tau_span = ci_high - ci_low denom = np.maximum(np.abs(tau_int), 1e-12) rel_width = tau_span / denom tau_interval_unstable = np.isfinite(rel_width) & (rel_width > max_tau_relative_width) return { 'undefined_autocorr_flag': undefined_autocorr.astype(np.uint8), 'low_effective_sample_flag': low_effective_sample.astype(np.uint8), 'tau_interval_unstable_flag': tau_interval_unstable.astype(np.uint8), }