Source code for utils.physics_helpers

"""
Physics-related utility functions for calculating thermodynamic observables and correlations.
"""
from __future__ import annotations

import numpy as np
from scipy.integrate import cumulative_trapezoid

from models.simulation_base import MonteCarloSimulation
from utils.exceptions import ZeroVarianceAutocorrelationError


[docs] def calculate_thermodynamics( *, mags: np.ndarray, engs: np.ndarray, T: float, L: int ) -> tuple[float, float, float, float]: """ Calculate average magnetization, energy, susceptibility, and specific heat. Parameters ---------- mags: Array of magnetization measurements. engs: Array of energy measurements. T: Temperature. L: Linear lattice size. Returns ------- A tuple of (avg_mag, avg_eng, susceptibility, specific_heat). Raises ------ ValueError: If ``T`` is not positive or ``L`` is not a positive integer. """ if T <= 0.0: raise ValueError(f'T must be positive (T > 0), got {T}') if not isinstance(L, (int, np.integer)) or L < 1: raise ValueError(f'L must be a positive integer, got {L!r}') avg_mag = float(np.mean(mags)) avg_eng = float(np.mean(engs)) N = L * L # Susceptibility: chi = N * Var(M) / T susceptibility = float(N * np.var(mags) / T) # Specific Heat: Cv = N * Var(E) / T^2 specific_heat = float(N * np.var(engs) / (T**2)) return avg_mag, avg_eng, susceptibility, specific_heat
[docs] def calculate_entropy( *, temperatures: np.ndarray, specific_heat: np.ndarray, s_ref: float = 0.0, ) -> np.ndarray: """ Compute entropy by integrating specific heat over a temperature sweep. Integrates C(T)/T from high temperature downward so that S(T_max) = ``s_ref`` and S(T) = s_ref - integral from T to T_max of C/T dT'. This avoids the C/T divergence near T = 0 and anchors the result at the high-temperature limit where the entropy is analytically known. Parameters ---------- temperatures: 1-D array of temperature values. Need not be sorted. specific_heat: Specific heat C_v at each temperature (same length). s_ref: Reference entropy at the highest temperature. Pass ``0.0`` for relative entropy or ``np.log(q)`` (per site, in units of k_B) for absolute entropy of a q-state model. Returns ------- 1-D array of entropy values, one per temperature, in the original unsorted order of ``temperatures``. Raises ------ ValueError: If arrays differ in length, contain fewer than 2 points, or include non-positive temperatures. """ temperatures = np.asarray(temperatures, dtype=np.float64) specific_heat = np.asarray(specific_heat, dtype=np.float64) if temperatures.shape != specific_heat.shape: raise ValueError( f'temperatures and specific_heat must have the same shape, ' f'got {temperatures.shape} and {specific_heat.shape}' ) if temperatures.size < 2: raise ValueError('Need at least 2 temperature points for integration') if np.any(temperatures <= 0.0): raise ValueError('All temperatures must be positive') # Sort ascending; remember original order to restore later. sort_idx = np.argsort(temperatures) t_sorted = temperatures[sort_idx] cv_sorted = specific_heat[sort_idx] # Integrand: C(T) / T integrand = cv_sorted / t_sorted # Cumulative integral from T_1 (lowest) to each T_i. partial = cumulative_trapezoid(integrand, t_sorted) # Prepend 0 for the first (lowest-T) point. partial = np.concatenate(([0.0], partial)) # S(T) = s_ref - (total_integral - partial_integral_up_to_T) total = partial[-1] entropy_sorted = s_ref - (total - partial) # Restore original ordering. entropy: np.ndarray = np.empty_like(entropy_sorted) entropy[sort_idx] = entropy_sorted return entropy
[docs] def calculate_autocorr( *, time_series: np.ndarray, max_lag: int | None = None, window_constant: float = 6.0, ) -> tuple[np.ndarray, float]: """ Compute the normalized autocorrelation function and integrated autocorrelation time. Uses an FFT-based estimator for C(t): zero-pads to 2N, computes the power spectrum, IFFTs, and normalizes by variance and the number of pairs at each lag. Applies the Madras-Sokal automatic windowing rule to stop integration at the lag W where W = ``window_constant`` * tau_int, preventing the estimator from accumulating noise at large lags. Parameters ---------- time_series: 1-D array of sequential measurements (e.g. magnetization). max_lag: Maximum lag to compute. Defaults to ``len(time_series) // 2``. window_constant: Multiplier for the Madras-Sokal window (default 6.0). Returns ------- A tuple ``(C_t, tau_int)`` where ``C_t`` is the normalized autocorrelation array truncated at the window endpoint, and ``tau_int`` is the integrated autocorrelation time (scalar). Raises ------ ValueError: If ``time_series`` has fewer than 3 elements. ZeroVarianceAutocorrelationError: If ``time_series`` has zero variance. """ x = np.asarray(time_series, dtype=np.float64) N = len(x) if N < 3: raise ValueError(f'time_series must have at least 3 elements, got {N}') variance = float(np.var(x)) if variance == 0.0: raise ZeroVarianceAutocorrelationError( 'time_series has zero variance; autocorrelation is undefined' ) if max_lag is None: max_lag = N // 2 # FFT-based unnormalized autocorrelation via power spectrum x_centered = x - np.mean(x) n_fft = 2 * N fx = np.fft.rfft(x_centered, n=n_fft) power = fx * np.conj(fx) acf_raw: np.ndarray = np.fft.irfft(power, n=n_fft)[:N].real # Normalize: divide by variance and by (N - t) pairs at each lag t lags = np.arange(N, dtype=np.float64) norm = variance * (N - lags) C_t_full: np.ndarray = acf_raw / norm # Madras-Sokal automatic window: integrate until window W = window_constant * tau_int tau_int = 0.5 window_end = max_lag for t in range(1, max_lag + 1): tau_int += float(C_t_full[t]) if t >= window_constant * tau_int: window_end = t break C_t: np.ndarray = C_t_full[:window_end + 1] return C_t, tau_int
[docs] def get_averaged_correlation( *, sim: MonteCarloSimulation, total_steps: int, sample_interval: int ) -> tuple[np.ndarray, np.ndarray]: """ Run simulation and average the correlation function over multiple configurations. Parameters ---------- sim: An instance of MonteCarloSimulation. total_steps: Total number of MC steps to run. sample_interval: Interval between correlation samples. Must be ≥ 1. Returns ------- r: Radial distances. G_r_avg: Averaged correlation values. Raises ------ ValueError: If ``sample_interval`` is less than 1 or ``total_steps`` is negative. """ if sample_interval < 1: raise ValueError(f'sample_interval must be >= 1, got {sample_interval}') if total_steps < 0: raise ValueError(f'total_steps must be non-negative, got {total_steps}') G_r_avg: np.ndarray | None = None count = 0 r: np.ndarray = np.array([]) for i in range(total_steps): sim.step() if i % sample_interval == 0: r, G_r = sim._calculate_correlation_function() if G_r_avg is None: G_r_avg = np.zeros_like(G_r) G_r_avg += G_r count += 1 if G_r_avg is not None and count > 0: G_r_avg /= count else: # Fallback if no samples were taken _, G_r_dummy = sim._calculate_correlation_function() G_r_avg = np.zeros_like(G_r_dummy) r = _ return r, G_r_avg
[docs] def radial_average_sk(*, spins: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Compute the circularly averaged structure factor :math:`S(|k|)`. Bins :math:`S(k)` by integer pixel radius from the DC centre of the shifted FFT, then averages within each annular bin. Parameters ---------- spins: (N, N) or (N, N, 2) spin array. Returns ------- k_vals: Wavevector magnitudes in units of 2π/N (reciprocal lattice). S_radial: Mean S(k) value for each annular bin. """ N = spins.shape[0] if spins.ndim == 3: # Vector spins (XY/Clock) sx, sy = spins[..., 0], spins[..., 1] Sk_raw = ( np.abs(np.fft.fft2(sx.astype(float))) ** 2 + np.abs(np.fft.fft2(sy.astype(float))) ** 2 ) else: # Scalar spins (Ising) Sk_raw = np.abs(np.fft.fft2(spins.astype(float))) ** 2 Sk = np.fft.fftshift(Sk_raw) / (N * N) cx = N // 2 iy, ix = np.indices((N, N)) r_int = np.sqrt((ix - cx) ** 2 + (iy - cx) ** 2).astype(int) # Average within each annular bin up to the Nyquist radius r_max = cx mask = r_int <= r_max tbin = np.bincount(r_int[mask].ravel(), Sk[mask].ravel()) nbin = np.bincount(r_int[mask].ravel()) S_radial = np.where(nbin > 0, tbin / nbin, 0.0) # Convert bin index → |k| in reciprocal lattice units (2π/N per bin) k_vals = np.arange(len(S_radial)) * (2.0 * np.pi / N) return k_vals, S_radial
[docs] def pair_correlation_x(*, spins: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Compute the real-space spin-spin pair correlation G(r) along x. Uses the Wiener-Khinchin theorem: the autocorrelation of each row is the inverse FFT of the row's power spectrum. Results are averaged over all rows (y positions) and normalised so G(0) = 1. Parameters ---------- spins: (N, N) or (N, N, 2) spin array. Returns ------- r_vals: Lag distances r = 0 … N//2 in lattice units. G: Normalised pair correlation G(r) / G(0). """ N = spins.shape[0] if spins.ndim == 3: # Vector spins (XY/Clock) sx, sy = spins[..., 0].astype(float), spins[..., 1].astype(float) Fx = np.fft.rfft(sx, axis=1) Fy = np.fft.rfft(sy, axis=1) autocorr = np.fft.irfft(np.abs(Fx) ** 2 + np.abs(Fy) ** 2, n=N, axis=1) else: # Scalar spins (Ising) s = spins.astype(float) F = np.fft.rfft(s, axis=1) autocorr = np.fft.irfft(np.abs(F) ** 2, n=N, axis=1) # Average over rows and normalise by N G_full = np.mean(autocorr, axis=0) / N r_half = N // 2 + 1 r_vals = np.arange(r_half) G = G_full[:r_half] if G[0] != 0.0: G = G / G[0] return r_vals, G
[docs] def compute_kinetics_metrics(*, sim: MonteCarloSimulation) -> dict[str, float]: """ Calculate common kinetics metrics (R_sk, xi) for a simulation state. Parameters ---------- sim: MonteCarloSimulation instance. Returns ------- Dictionary containing 'R_sk' and 'xi'. """ if sim.spins is None: return {'R_sk': 0.0, 'xi': 0.0} # 1. R_sk from structure factor first moment kvals, S_radial = radial_average_sk(spins=sim.spins) S_k = S_radial[1:] K_k = kvals[1:] denom = float(np.sum(K_k * S_k)) R_sk = (2.0 * np.pi * float(np.sum(S_k) / denom)) if denom != 0 else 0.0 # 2. xi from G(r) 1/e decay r_vals, G = pair_correlation_x(spins=sim.spins) inv_e = 1.0 / np.e below = np.where(G < inv_e)[0] if len(below) == 0: xi = float(r_vals[-1]) elif below[0] == 0: xi = float(r_vals[0]) else: idx = below[0] r0, r1 = float(r_vals[idx - 1]), float(r_vals[idx]) g0, g1 = float(G[idx - 1]), float(G[idx]) xi = r0 + (inv_e - g0) * (r1 - r0) / (g1 - g0) return {'R_sk': R_sk, 'xi': xi}
[docs] def power_fit( *, t_arr: np.ndarray, y_arr: np.ndarray, mask: np.ndarray ) -> tuple[float, float] | tuple[None, None]: """ Fit a power law y = prefactor * t^exponent via log-log linear regression. Parameters ---------- t_arr: Independent variable array (e.g., time or length scale). y_arr: Dependent variable array; only positive values are used in the fit. mask: Boolean mask selecting the subset of points to include. Returns ------- exponent: Fitted power-law exponent, or None if fewer than 3 valid points. prefactor: Fitted prefactor, or None if fewer than 3 valid points. """ valid = mask & (y_arr > 0) if valid.sum() < 3: return None, None coeffs = np.polyfit(np.log(t_arr[valid]), np.log(y_arr[valid]), 1) return float(coeffs[0]), float(np.exp(coeffs[1]))
def _valid_prefix(x: np.ndarray) -> np.ndarray: """Return the non-NaN prefix from a padded trajectory.""" valid = np.isfinite(x) if not np.any(valid): return np.empty(0, dtype=float) end = int(np.where(valid)[0][-1]) + 1 return x[:end] def _moving_average(x: np.ndarray, window: int) -> np.ndarray: window = max(3, min(window, len(x))) return np.convolve(x, np.ones(window) / window, mode='valid')
[docs] def estimate_relaxation_time_two_start( trace_random: np.ndarray, trace_ordered: np.ndarray, *, k: float = 1.0, smooth_window: int = 30, dwell_window: int = 30, min_fraction_inside: float = 0.85, ) -> int: """Estimate thermalization time from convergence of random- and ordered-start traces. Compares a trajectory started from a random spin configuration against one started from a fully ordered configuration. Returns the first step at which both smoothed traces enter and sustain a common equilibrium band. Parameters ---------- trace_random: 1-D magnetization trace from a random initial state. trace_ordered: 1-D magnetization trace from an ordered initial state. k: Half-width of the convergence band in units of the tail standard deviation. Larger values are more permissive. smooth_window: Window length (steps) for the moving-average smoother. dwell_window: Window length (steps) for the sustained-convergence test. min_fraction_inside: Fraction of steps within ``dwell_window`` that must lie inside the band to declare convergence. Returns ------- Estimated relaxation time in MC steps. Returns the full trace length if no convergence is detected, or 0 for very short traces. """ r = np.abs(_valid_prefix(np.asarray(trace_random, dtype=float))) o = np.abs(_valid_prefix(np.asarray(trace_ordered, dtype=float))) n = min(len(r), len(o)) if n < 8: return 0 r = r[:n] o = o[:n] r_sm = _moving_average(r, smooth_window) o_sm = _moving_average(o, smooth_window) m = min(len(r_sm), len(o_sm)) r_sm = r_sm[:m] o_sm = o_sm[:m] half = m // 2 tail_combined = np.concatenate([r_sm[half:], o_sm[half:]]) mean_eq = tail_combined.mean() sigma_eq = max(tail_combined.std(), 1e-12) band = k * sigma_eq both_inside = ( (np.abs(r_sm - mean_eq) <= band) & (np.abs(o_sm - mean_eq) <= band) & (np.abs(r_sm - o_sm) <= band) ).astype(float) dwell_window = max(3, min(dwell_window, len(both_inside))) sustained_fraction = ( np.convolve(both_inside, np.ones(dwell_window), mode='valid') / dwell_window ) hits = np.where(sustained_fraction >= min_fraction_inside)[0] return int(hits[0]) if hits.size else n