Source code for scripts.ising.diag_eq_traces

"""
Diagnostic tool for visualizing two-start equilibration traces.

This script runs simultaneous random-start and ordered-start simulations of the
2D Ising model at multiple representative temperatures (deep ordered, critical,
and paramagnetic). It identifies seeds that lead to metastable stuck states at
low temperatures and generates a multi-panel visualization of the magnetization
trajectories.

Results are saved to ``results/ising/diagnostic_eq_traces_L<size>.png``.
"""
from __future__ import annotations

import argparse
import logging
import os
from typing import Any

import matplotlib
import numpy as np

matplotlib.use("Agg")
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker

from models.ising_model import IsingSimulation as IsingModel
from utils.equilibration import estimate_relaxation_time_two_start
from utils.system import parse_args_compat, setup_logging

#: Exact Onsager critical temperature for the 2D nearest-neighbour Ising model.
TC_ISING: float = 2.0 / np.log(1.0 + np.sqrt(2.0))

#: Color palette for traces.
PALETTE: dict[str, str] = {
    'random': '#4878CF',
    'ordered': '#D65F5F',
    'convergence': '#888888',
}


[docs] def collect_trace( *, size: int, temp: float, seed: int, init_state: str, n_chunks: int, chunk_size: int, ) -> np.ndarray: """ Run a simulation and collect the magnetization trace. Parameters ---------- size : int Linear lattice size L. temp : float Temperature T. seed : int RNG seed. init_state : str Initial state ('random' or 'ordered'). n_chunks : int Number of measurement chunks. chunk_size : int Sweeps per chunk. Returns ------- np.ndarray Array of magnetization values at each sweep. """ sim = IsingModel( size=size, temp=temp, init_state=init_state, update='checkerboard', seed=seed, ) mags: list[float] = [] for _ in range(n_chunks): m, _ = sim.run(n_steps=chunk_size) mags.extend(m) return np.array(mags)
[docs] def main() -> None: """ Execute the equilibration diagnostic and generate plots. """ parser = argparse.ArgumentParser( description='Generate two-start equilibration traces for the 2D Ising model.', ) parser.add_argument('--size', type=int, default=64, help='Linear lattice size L') parser.add_argument('--max-steps', type=int, default=5000, help='Total MC sweeps to record') parser.add_argument('--chunk-size', type=int, default=50, help='Sweeps per measurement chunk') parser.add_argument( '--scan-seeds', type=int, default=25, help='Number of seeds to pre-screen for stuck states at low T', ) parser.add_argument( '--scan-steps', type=int, default=500, help='Pre-screen length for identifying stuck states', ) parser.add_argument('--output-dir', type=str, default='results/ising', help='Output directory') parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parse_args_compat(parser) logger = setup_logging(level=logging.DEBUG if args.verbose else logging.INFO) size = args.size max_steps = args.max_steps chunk_size = args.chunk_size n_chunks = max_steps // chunk_size temperatures = [0.5, 2.0, TC_ISING, 2.5] t_labels = [ r"$T = 0.5$ (deep ordered)", r"$T = 2.00$ (below $T_c$)", rf"$T = T_c \approx {TC_ISING:.3f}$", r"$T = 2.50$ (above $T_c$)", ] # Pre-screen seeds at low T to find interesting trajectories st_temp = temperatures[0] logger.info( f"Scanning {args.scan_seeds} seeds at T={st_temp} L={size} for stuck vs. ordering..." ) stuck_seeds: list[int] = [] ordering_seeds: list[int] = [] scan_chunks = args.scan_steps // chunk_size for s in range(args.scan_seeds): tr = collect_trace( size=size, temp=st_temp, seed=s, init_state='random', n_chunks=scan_chunks, chunk_size=chunk_size, ) final_m = float(np.abs(tr[-50:]).mean()) if final_m < 0.5: stuck_seeds.append(s) else: ordering_seeds.append(s) logger.info(f" Stuck seeds (|M|<0.5 after {args.scan_steps} sweeps): {stuck_seeds[:6]}") logger.info(f" Ordering seeds: {ordering_seeds[:6]}") # Select seeds for display: 6 seeds across all temperatures seeds_stuck = stuck_seeds[:3] seeds_ordering = ordering_seeds[:3] seeds_all = sorted(set(seeds_stuck + seeds_ordering))[:6] logger.info(f" Using 6 seeds across all temperatures: {seeds_all}") fig, axes = plt.subplots(4, 6, figsize=(20, 13), sharey='row') sweeps = np.arange(1, max_steps + 1) smooth_window = 120 for t_idx, (temp, label) in enumerate(zip(temperatures, t_labels, strict=True)): for s_idx, seed in enumerate(seeds_all): ax = axes[t_idx, s_idx] tr = collect_trace( size=size, temp=temp, seed=seed, init_state='random', n_chunks=n_chunks, chunk_size=chunk_size, ) to = collect_trace( size=size, temp=temp, seed=seed, init_state='ordered', n_chunks=n_chunks, chunk_size=chunk_size, ) # Convert to numpy arrays for processing tr = np.asarray(tr, dtype=float) to = np.asarray(to, dtype=float) # Calculate smoothed traces tr_smooth = np.convolve(tr, np.ones(smooth_window) / smooth_window, mode='valid') to_smooth = np.convolve(to, np.ones(smooth_window) / smooth_window, mode='valid') # Estimate convergence time tau_est = estimate_relaxation_time_two_start( tr, to, smooth_window=smooth_window, dwell_window=smooth_window, ) # Plot raw data with low alpha as background ax.plot( sweeps, np.abs(tr), color=PALETTE['random'], alpha=0.15, lw=0.5, ) ax.plot( sweeps, np.abs(to), color=PALETTE['ordered'], alpha=0.15, lw=0.5, ) # Plot smoothed data smooth_start = smooth_window - 1 ax.plot( sweeps[smooth_start:], np.abs(tr_smooth), color=PALETTE['random'], lw=1.8, alpha=0.85, label="random start" if (t_idx == 0 and s_idx == 0) else None, ) ax.plot( sweeps[smooth_start:], np.abs(to_smooth), color=PALETTE['ordered'], lw=1.8, alpha=0.85, label="ordered start" if (t_idx == 0 and s_idx == 0) else None, ) # Mark convergence point if detected if tau_est < max_steps: ax.axvline( sweeps[tau_est], color=PALETTE['convergence'], linestyle='--', lw=1.2, alpha=0.6, ) ax.text( sweeps[tau_est], 0.97, f"{int(sweeps[tau_est])}", transform=ax.get_xaxis_transform(), ha='center', va='top', fontsize=6, color=PALETTE['convergence'], bbox=dict(boxstyle='round,pad=0.1', fc='white', ec='none', alpha=0.7), ) # Row labels (temperatures) on the left if s_idx == 0: ax.set_ylabel(label, fontsize=10, fontweight='bold') # Column labels (seeds) on top if t_idx == 0: stuck_marker = " ★" if seed in seeds_stuck else "" ax.set_title(f"seed {seed}{stuck_marker}", fontsize=9, fontweight='semibold') ax.set_xlim(0, max_steps) ax.set_ylim(-0.02, 1.05) def x_fmt(x: float, _: Any) -> str: return f"{x/1000:.0f}k" if x >= 1000 else f"{x:.0f}" ax.xaxis.set_major_formatter(mticker.FuncFormatter(x_fmt)) ax.yaxis.set_major_locator(mticker.MultipleLocator(0.2)) ax.grid(True, linewidth=0.4, alpha=0.25) ax.tick_params(labelsize=7) # Add legend only to the first subplot if t_idx == 0 and s_idx == 0: ax.legend(fontsize=7, loc='upper right', framealpha=0.9) fig.suptitle( rf"Two-start Equilibration Traces · Ising $L={size}$ · 6 Seeds × 4 Temperatures", fontsize=13, y=0.995, ) # Add common x-label for bottom row fig.text(0.5, 0.02, 'MC sweep', ha='center', fontsize=10) plt.tight_layout(rect=(0, 0.035, 1, 0.985)) os.makedirs(args.output_dir, exist_ok=True) outpath = os.path.join(args.output_dir, f"diagnostic_eq_traces_L{size}.png") fig.savefig(outpath, dpi=150, bbox_inches="tight") logger.info(f"Saved visualization → {outpath}") # Second panel: T = 2.00 detailed view in 2×3 grid. # Styled after the Two-Start Convergence Example in the notebook: # faint raw traces, bold smoothed traces, mutual cross-bands, and tau marker. t200_temp = temperatures[1] # T = 2.00 t200_label = t_labels[1] TOLERANCE_K: float = 1.0 SIGMA_FLOOR: float = 0.02 fig2, axes2 = plt.subplots(2, 3, figsize=(14, 8), sharey=True) axes2_flat = axes2.flatten() for s_idx, seed in enumerate(seeds_all): ax = axes2_flat[s_idx] tr = collect_trace( size=size, temp=t200_temp, seed=seed, init_state='random', n_chunks=n_chunks, chunk_size=chunk_size, ) to = collect_trace( size=size, temp=t200_temp, seed=seed, init_state='ordered', n_chunks=n_chunks, chunk_size=chunk_size, ) tr = np.asarray(np.abs(tr), dtype=float) to = np.asarray(np.abs(to), dtype=float) # Smoothed traces aligned to the right edge of the rolling window tr_smooth = np.convolve(tr, np.ones(smooth_window) / smooth_window, mode='valid') to_smooth = np.convolve(to, np.ones(smooth_window) / smooth_window, mode='valid') n_sm = min(len(tr_smooth), len(to_smooth)) tr_smooth = tr_smooth[:n_sm] to_smooth = to_smooth[:n_sm] steps_sm = sweeps[smooth_window - 1: smooth_window - 1 + n_sm] # Mutual cross-band criterion: tail statistics from the second half half = n_sm // 2 mu_r = float(tr_smooth[half:].mean()) mu_o = float(to_smooth[half:].mean()) sig_r = max(float(tr_smooth[half:].std()), SIGMA_FLOOR) sig_o = max(float(to_smooth[half:].std()), SIGMA_FLOOR) band_r = TOLERANCE_K * sig_r # ordered must enter random's band band_o = TOLERANCE_K * sig_o # random must enter ordered's band # Estimate convergence time (index into smoothed array) tau_est = estimate_relaxation_time_two_start( tr, to, smooth_window=smooth_window, dwell_window=smooth_window, ) # Raw traces (faint background) ax.plot( sweeps, tr, color=PALETTE['random'], alpha=0.2, lw=0.6, label='Random start (raw)', ) ax.plot( sweeps, to, color=PALETTE['ordered'], alpha=0.2, lw=0.6, label='Ordered start (raw)', ) # Smoothed traces (bold) ax.plot( steps_sm, tr_smooth, color=PALETTE['random'], lw=2.0, label='Random start (smoothed)', ) ax.plot( steps_sm, to_smooth, color=PALETTE['ordered'], lw=2.0, label='Ordered start (smoothed)', ) # Ordered-start band: random must enter (centred on ordered tail mean) ax.axhline(mu_o, color=PALETTE['ordered'], linestyle='--', alpha=0.7, label=f'Ordered tail mean ({mu_o:.3f})') ax.axhspan(mu_o - band_o, mu_o + band_o, color=PALETTE['ordered'], alpha=0.12, label=r'Ordered band ($\pm k\sigma_o$)') # Random-start band: ordered must enter (centred on random tail mean) ax.axhline(mu_r, color=PALETTE['random'], linestyle='--', alpha=0.7, label=f'Random tail mean ({mu_r:.3f})') ax.axhspan(mu_r - band_r, mu_r + band_r, color=PALETTE['random'], alpha=0.12, label=r'Random band ($\pm k\sigma_r$)') # Convergence marker mapped onto the original sweep axis if tau_est < max_steps: x_tau = int(sweeps[tau_est]) ax.axvline( x_tau, color='green', linewidth=2, linestyle=':', label=rf'$\hat{{\tau}}_\mathrm{{relax}} \approx {x_tau}$', ) ax.text( x_tau, 0.97, f"{x_tau}", transform=ax.get_xaxis_transform(), ha='center', va='top', fontsize=8, color='green', bbox=dict(boxstyle='round,pad=0.15', fc='white', ec='none', alpha=0.7), ) stuck_marker = " ★" if seed in seeds_stuck else "" ax.set_title(f"seed {seed}{stuck_marker}", fontsize=10, fontweight='semibold') ax.set_xlim(0, max_steps) ax.set_ylim(0, 1.05) def x_fmt(x: float, _: Any) -> str: return f"{x/1000:.0f}k" if x >= 1000 else f"{x:.0f}" ax.xaxis.set_major_formatter(mticker.FuncFormatter(x_fmt)) ax.yaxis.set_major_locator(mticker.MultipleLocator(0.2)) ax.grid(axis='y', alpha=0.3) ax.tick_params(labelsize=8) if s_idx >= 3: ax.set_xlabel('MC Steps', fontsize=9) if s_idx == 0: ax.set_ylabel(r'Absolute Magnetization $|M|$', fontsize=10) ax.legend(loc='lower right', fontsize=7, framealpha=0.9) fig2.suptitle( rf"Two-start Equilibration Traces · Ising $L={size}$ · {t200_label}", fontsize=13, y=0.995, ) plt.tight_layout(rect=(0, 0, 1, 0.985)) outpath2 = os.path.join(args.output_dir, f"diagnostic_eq_traces_T200_L{size}.png") fig2.savefig(outpath2, dpi=150, bbox_inches="tight") logger.info(f"Saved detailed T=2.00 visualization → {outpath2}")
if __name__ == '__main__': main()