Source code for models.ising_model

"""
2D Ising Model simulation using Metropolis-Hastings and Wolff cluster algorithms.
"""
from __future__ import annotations

import os

import matplotlib.pyplot as plt
import numpy as np
from numba import njit, prange

from .simulation_base import MonteCarloSimulation


[docs] @njit(cache=True, fastmath=True) def ising_step_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Perform one full Monte Carlo sweep of the Ising lattice. Uses a checkerboard update pattern for better Numba optimization. Parameters ---------- spins: (N, N) array of spins (+1 or -1). beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. idx_prev: Pre-calculated previous-neighbor indices. Returns ------- Updated spins array. """ N = spins.shape[0] # Pre-calculate transition probabilities for dE > 0 # dE can be 4J or 8J prob4 = np.exp(-4.0 * J * beta) prob8 = np.exp(-8.0 * J * beta) for parity in range(2): for i in range(N): # Use striding to avoid 'if' condition in the inner loop start_j = (parity + i) % 2 for j in range(start_j, N, 2): # Using pre-calculated indices for PBCs inxt = idx_next[i] iprv = idx_prev[i] jnxt = idx_next[j] jprv = idx_prev[j] neighbor_sum = spins[iprv, j] + spins[inxt, j] + spins[i, jprv] + spins[i, jnxt] dE = 2 * J * spins[i, j] * neighbor_sum if dE <= 0: spins[i, j] *= -1 else: # Optimized probability check p = prob4 if dE == 4.0 * J else prob8 if np.random.random() < p: spins[i, j] *= -1 return spins
[docs] @njit(cache=True, fastmath=True, parallel=True) def ising_step_parallel_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Parallel version of ising_step_numba using numba.prange. Parameters ---------- spins: (N, N) array of spins (+1 or -1). beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. idx_prev: Pre-calculated previous-neighbor indices. Returns ------- Updated spins array. """ N = spins.shape[0] prob4 = np.exp(-4.0 * J * beta) prob8 = np.exp(-8.0 * J * beta) for parity in range(2): for i in prange(N): start_j = (parity + i) % 2 for j in range(start_j, N, 2): inxt = idx_next[i] iprv = idx_prev[i] jnxt = idx_next[j] jprv = idx_prev[j] neighbor_sum = spins[iprv, j] + spins[inxt, j] + spins[i, jprv] + spins[i, jnxt] dE = 2 * J * spins[i, j] * neighbor_sum if dE <= 0: spins[i, j] *= -1 else: p = prob4 if dE == 4.0 * J else prob8 if np.random.random() < p: spins[i, j] *= -1 return spins
[docs] @njit(cache=True, fastmath=True) def ising_energy_numba(*, spins: np.ndarray, J: float, idx_next: np.ndarray) -> float: """ Calculate the total energy of the Ising lattice. Parameters ---------- spins: (N, N) array of spins. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. Returns ------- energy: Total energy per site. """ N = spins.shape[0] energy = 0.0 for i in range(N): inxt = idx_next[i] for j in range(N): jnxt = idx_next[j] # Sum unique pairs (right and down) to avoid double counting energy -= J * spins[i, j] * (spins[inxt, j] + spins[i, jnxt]) return energy / (N * N)
[docs] @njit(cache=True, fastmath=True) def ising_step_random_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Perform one full Monte Carlo sweep of the Ising lattice using random sequential updates (N^2 randomly chosen single-spin flip attempts). Unlike the checkerboard sweep, sites are chosen uniformly at random with replacement, giving a more physical stochastic dynamics with no spatial bias. This is the standard Metropolis single-spin flip algorithm. Boltzmann factors for the two possible positive energy changes (4J, 8J) are pre-calculated outside the inner loop to avoid repeated calls to ``np.exp``, matching the optimisation used in ``ising_step_numba``. Parameters ---------- spins: (N, N) array of spins (+1 or -1). beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. idx_prev: Pre-calculated previous-neighbor indices. Returns ------- Updated spins array. """ N = spins.shape[0] # Pre-calculate the two possible acceptance probabilities for dE > 0. prob4 = np.exp(-4.0 * J * beta) prob8 = np.exp(-8.0 * J * beta) for _ in range(N * N): idx = np.random.randint(0, N * N) i = idx // N j = idx % N inxt = idx_next[i] iprv = idx_prev[i] jnxt = idx_next[j] jprv = idx_prev[j] neighbor_sum = spins[iprv, j] + spins[inxt, j] + spins[i, jprv] + spins[i, jnxt] dE = 2 * J * spins[i, j] * neighbor_sum if dE <= 0: spins[i, j] *= -1 else: p = prob4 if dE == 4.0 * J else prob8 if np.random.random() < p: spins[i, j] *= -1 return spins
[docs] @njit(cache=True, fastmath=True) def ising_wolff_step_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> tuple: """ Perform one Wolff cluster flip on the Ising lattice. A single cluster is grown by probabilistic DFS from a random seed site and then flipped in a single collective move. The bond acceptance probability ``P_add = 1 - exp(-2 beta J)`` is derived from the Fortuin-Kasteleyn representation and guarantees detailed balance without rejections. One call constitutes one *cluster sweep*. The effective number of site updates per call scales as the mean cluster size ``<C>`` and diverges at T_c, which is precisely where Metropolis suffers maximum critical slowing down. ``parallel=True`` is silently ignored: cluster growth is inherently sequential. Parameters ---------- spins: (N, N) array of spins (+1 or -1). beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices (PBC). idx_prev: Pre-calculated previous-neighbor indices (PBC). Returns ------- spins: Updated spins array. cluster_size: Number of spins flipped in this step. """ N = spins.shape[0] p_add = 1.0 - np.exp(-2.0 * J * beta) # Choose a random seed site si = np.random.randint(0, N) sj = np.random.randint(0, N) seed_spin = spins[si, sj] # Pre-allocate cluster membership mask and DFS stack in_cluster = np.zeros((N, N), dtype=np.bool_) stack = np.empty(N * N, dtype=np.int64) in_cluster[si, sj] = True stack[0] = si * N + sj stack_top = 1 cluster_size = 1 # seed site already in cluster while stack_top > 0: stack_top -= 1 flat = stack[stack_top] ci = flat // N cj = flat % N inxt = idx_next[ci] iprv = idx_prev[ci] jnxt = idx_next[cj] jprv = idx_prev[cj] # North if not in_cluster[iprv, cj] and spins[iprv, cj] == seed_spin: if np.random.random() < p_add: in_cluster[iprv, cj] = True stack[stack_top] = iprv * N + cj stack_top += 1 cluster_size += 1 # South if not in_cluster[inxt, cj] and spins[inxt, cj] == seed_spin: if np.random.random() < p_add: in_cluster[inxt, cj] = True stack[stack_top] = inxt * N + cj stack_top += 1 cluster_size += 1 # West if not in_cluster[ci, jprv] and spins[ci, jprv] == seed_spin: if np.random.random() < p_add: in_cluster[ci, jprv] = True stack[stack_top] = ci * N + jprv stack_top += 1 cluster_size += 1 # East if not in_cluster[ci, jnxt] and spins[ci, jnxt] == seed_spin: if np.random.random() < p_add: in_cluster[ci, jnxt] = True stack[stack_top] = ci * N + jnxt stack_top += 1 cluster_size += 1 # Flip all cluster spins in-place for i in range(N): for j in range(N): if in_cluster[i, j]: spins[i, j] *= -1 return spins, cluster_size
[docs] class IsingSimulation(MonteCarloSimulation): """ Simulation of the 2D Ising model on a square lattice. """ _VALID_UPDATES: frozenset = frozenset({'checkerboard', 'random', 'wolff'}) def __init__( self, *, size: int, temp: float, J: float = 1.0, update: str = 'checkerboard', init_state: str = 'random', parallel: bool = False, seed: int | None = None, ): """ Initialize the Ising simulation. Parameters ---------- size: Linear dimension L of the L x L lattice. temp: Temperature T. J: Coupling constant (default 1.0). update: Update scheme - ``'checkerboard'`` (default, faster), ``'random'`` (random sequential Metropolis, physical stochastic dynamics for coarsening studies), or ``'wolff'`` (Wolff cluster algorithm, highly efficient near T_c due to vanishing critical slowing down). init_state: Initial spin configuration: ``'random'`` (default) or ``'ordered'``. parallel: Whether to use parallelized Numba kernels (only for checkerboard). seed: Optional random seed for reproducibility. Raises ------ ValueError: If ``update`` is not one of the recognised schemes. """ super().__init__(size=size, temp=temp, init_state=init_state, seed=seed) if update not in self._VALID_UPDATES: valid_opts = sorted(self._VALID_UPDATES) raise ValueError(f'Unknown update scheme {update!r}. Valid options: {valid_opts}') self.J = J self.update = update self.parallel = parallel if self.init_state == 'ordered': self.spins = np.ones((size, size), dtype=np.int8) else: self.spins = self.rng.choice(np.array([-1, 1], dtype=np.int8), size=(size, size)) # Last Wolff cluster size (0 for non-Wolff updates or before any step) self.last_cluster_size: int = 0
[docs] def step(self) -> None: """Perform one Monte Carlo sweep using the configured update scheme.""" if self.spins is not None: # For Numba compatibility with reproducibility, we seed numba's # random generator if a seed was provided. if self.seed is not None: from .simulation_base import _seed_numba _seed_numba(seed=self.seed + self.steps) if self.update == 'random': self.spins = ising_step_random_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) elif self.update == 'wolff': self.spins, self.last_cluster_size = ising_wolff_step_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) elif self.parallel: self.spins = ising_step_parallel_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) else: self.spins = ising_step_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) self.steps += 1
[docs] def run_with_cluster_sizes(self, *, n_steps: int) -> tuple[np.ndarray, np.ndarray, np.ndarray]: """Run the simulation and additionally record the Wolff cluster size at every step. For non-Wolff update schemes the cluster-size array is filled with zeros because local Metropolis flips exactly one spin at a time (cluster size = 1 by convention would also be valid, but zero makes it easy to detect misuse). Parameters ---------- n_steps: Number of MC steps to perform and record. Returns ------- magnetization: Array of `|M|` per spin at each step. energies: Array of energy per spin at each step. cluster_sizes: Array of cluster sizes (number of spins flipped) at each step. Always zero for non-Wolff algorithms. """ magnetization = np.empty(n_steps, dtype=float) energies = np.empty(n_steps, dtype=float) cluster_sizes = np.zeros(n_steps, dtype=int) for i in range(n_steps): self.step() magnetization[i] = self._get_magnetization() energies[i] = self._get_energy() cluster_sizes[i] = self.last_cluster_size return magnetization, energies, cluster_sizes
def _get_magnetization(self) -> float: """Calculate magnetization per spin.""" if self.spins is not None: return float(np.abs(np.sum(self.spins)) / (self.size**2)) return 0.0 def _get_energy(self) -> float: """Calculate energy per spin of the lattice.""" if self.spins is not None: return float( ising_energy_numba(spins=self.spins, J=self.J, idx_next=self.idx_next) ) return 0.0 def _get_structure_factor_squared_unshifted(self) -> np.ndarray: """Calculate the unshifted squared magnitude of the Fourier transform.""" if self.spins is not None: Sk = np.fft.fft2(self.spins) return np.abs(Sk) ** 2 return np.array([])
[docs] def main() -> None: """CLI entry point for Ising simulation example.""" import argparse import logging from utils.system_helpers import setup_logging parser = argparse.ArgumentParser(description='Ising Model Quick Example') parser.add_argument('--size', type=int, default=128, help='Lattice size L') parser.add_argument('--temp', type=float, default=2.269, help='Temperature T') parser.add_argument('--steps', type=int, default=500, help='MC steps') parser.add_argument('--seed', type=int, default=None, help='Random seed') parser.add_argument('--parallel', action='store_true', help='Use parallel kernels') parser.add_argument( '--update', type=str, default='checkerboard', choices=['checkerboard', 'random', 'wolff'], help='Update scheme (default: checkerboard)', ) parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() log_level = logging.DEBUG if args.verbose else logging.INFO logger = setup_logging(level=log_level) logger.info( f'Initializing Ising Model (L={args.size}, T={args.temp}, ' f'update={args.update}, parallel={args.parallel})...' ) sim = IsingSimulation( size=args.size, temp=args.temp, seed=args.seed, parallel=args.parallel, update=args.update ) logger.info(f'Running for {args.steps} steps...') mag_history, energy_history = sim.run(n_steps=args.steps) # Plotting fig, (ax1, ax2, ax3) = plt.subplots(1, 3, figsize=(18, 5)) fig.suptitle(f'2D Ising Model - $L={args.size}, T={args.temp}$', fontsize=14) # Final Configuration if sim.spins is not None: ax1.imshow(sim.spins, cmap='gray', interpolation='none') ax1.set_title('Final Spin Configuration') ax1.axis('off') # Magnetization and Energy ax2.plot(mag_history, label='|M|') ax2.plot(energy_history, label='Energy') ax2.set_title('Thermodynamics vs Time') ax2.set_xlabel('Monte Carlo Steps') ax2.set_ylabel('Value') ax2.grid(True, alpha=0.3) ax2.legend() # Correlation r, G_r = sim._calculate_correlation_function() ax3.plot(r, G_r, 'o-', markersize=3) ax3.set_title('Spin-Spin Correlation G(r)') ax3.set_xlabel('Distance r') ax3.set_ylabel('G(r)') ax3.grid(True, alpha=0.3) ax3.set_yscale('log') plt.tight_layout() output_dir = 'results' os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, 'ising_example.png') plt.savefig(output_file) logger.info(f'Simulation finished. Plot saved to {output_file}')
if __name__ == '__main__': main()