Source code for models.xy_model

"""
2D XY Model simulation using Metropolis-Hastings and Wolff cluster algorithms.
"""
from __future__ import annotations

import os

import matplotlib.pyplot as plt
import numpy as np
from numba import njit, prange

from .simulation_base import (
    MonteCarloSimulation,
    calculate_vortex_density_numba,
    calculate_vorticity_numba,
    get_helicity_data_numba,
)


[docs] @njit(cache=True, fastmath=True) def xy_step_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Perform one full Monte Carlo sweep of the XY lattice. Uses a checkerboard update pattern for better Numba optimization. Parameters ---------- spins: (N, N, 2) array of unit vectors. beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. idx_prev: Pre-calculated previous-neighbor indices. Returns ------- Updated spins array. """ N = spins.shape[0] # Batch generate random updates for the whole lattice deltas = np.random.uniform(-0.5, 0.5, size=(N, N)) cos_vals = np.cos(deltas) sin_vals = np.sin(deltas) random_vals = np.random.random(size=(N, N)) for parity in range(2): for i in range(N): # Use striding to avoid 'if' condition in the inner loop start_j = (parity + i) % 2 inxt = idx_next[i] iprv = idx_prev[i] for j in range(start_j, N, 2): jnxt = idx_next[j] jprv = idx_prev[j] # Neighbor sum vector nx = spins[iprv, j, 0] + spins[inxt, j, 0] + spins[i, jprv, 0] + spins[i, jnxt, 0] ny = spins[iprv, j, 1] + spins[inxt, j, 1] + spins[i, jprv, 1] + spins[i, jnxt, 1] # Current spin sx = spins[i, j, 0] sy = spins[i, j, 1] # Proposed update from pre-calculated values c, s = cos_vals[i, j], sin_vals[i, j] sx_new = sx * c - sy * s sy_new = sx * s + sy * c # Re-normalize to prevent drift norm = np.sqrt(sx_new**2 + sy_new**2) sx_new /= norm sy_new /= norm # Energy change: -J * (s_new - s_old) . neighbors dE = -J * ((sx_new * nx + sy_new * ny) - (sx * nx + ny * sy)) if dE <= 0 or random_vals[i, j] < np.exp(-dE * beta): spins[i, j, 0] = sx_new spins[i, j, 1] = sy_new return spins
[docs] @njit(cache=True, fastmath=True, parallel=True) def xy_step_parallel_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Parallel version of xy_step_numba. Parameters ---------- spins: (N, N, 2) array of unit vectors. beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. idx_prev: Pre-calculated previous-neighbor indices. Returns ------- Updated spins array. """ N = spins.shape[0] deltas = np.random.uniform(-0.5, 0.5, size=(N, N)) cos_vals = np.cos(deltas) sin_vals = np.sin(deltas) random_vals = np.random.random(size=(N, N)) for parity in range(2): for i in prange(N): start_j = (parity + i) % 2 inxt = idx_next[i] iprv = idx_prev[i] for j in range(start_j, N, 2): jnxt = idx_next[j] jprv = idx_prev[j] nx = spins[iprv, j, 0] + spins[inxt, j, 0] + spins[i, jprv, 0] + spins[i, jnxt, 0] ny = spins[iprv, j, 1] + spins[inxt, j, 1] + spins[i, jprv, 1] + spins[i, jnxt, 1] sx, sy = spins[i, j, 0], spins[i, j, 1] c, s = cos_vals[i, j], sin_vals[i, j] sx_new = sx * c - sy * s sy_new = sx * s + sy * c norm = np.sqrt(sx_new**2 + sy_new**2) sx_new /= norm sy_new /= norm dE = -J * ((sx_new * nx + sy_new * ny) - (sx * nx + sy * ny)) if dE <= 0 or random_vals[i, j] < np.exp(-dE * beta): spins[i, j, 0] = sx_new spins[i, j, 1] = sy_new return spins
[docs] @njit(cache=True, fastmath=True) def xy_step_random_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Perform one full Monte Carlo sweep of the XY lattice using random sequential updates. Parameters ---------- spins: (N, N, 2) array of unit vectors. beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. idx_prev: Pre-calculated previous-neighbor indices. Returns ------- Updated spins array. """ N = spins.shape[0] num_attempts = N * N # Pre-calculate random indices and updates indices = np.random.randint(0, num_attempts, size=num_attempts) deltas = np.random.uniform(-0.5, 0.5, size=num_attempts) cos_vals = np.cos(deltas) sin_vals = np.sin(deltas) random_vals = np.random.random(size=num_attempts) for k in range(num_attempts): idx = indices[k] i = idx // N j = idx % N inxt = idx_next[i] iprv = idx_prev[i] jnxt = idx_next[j] jprv = idx_prev[j] # Neighbor sum vector nx = spins[iprv, j, 0] + spins[inxt, j, 0] + spins[i, jprv, 0] + spins[i, jnxt, 0] ny = spins[iprv, j, 1] + spins[inxt, j, 1] + spins[i, jprv, 1] + spins[i, jnxt, 1] # Current spin sx = spins[i, j, 0] sy = spins[i, j, 1] # Proposed update c, s = cos_vals[k], sin_vals[k] sx_new = sx * c - sy * s sy_new = sx * s + sy * c # Re-normalize norm = np.sqrt(sx_new**2 + sy_new**2) sx_new /= norm sy_new /= norm # Energy change dE = -J * ((sx_new * nx + sy_new * ny) - (sx * nx + sy * ny)) if dE <= 0 or random_vals[k] < np.exp(-dE * beta): spins[i, j, 0] = sx_new spins[i, j, 1] = sy_new return spins
[docs] @njit(cache=True, fastmath=True) def xy_energy_numba(*, spins: np.ndarray, J: float, idx_next: np.ndarray) -> float: """ Calculate the total energy of the XY lattice. Parameters ---------- spins: (N, N, 2) array of unit vectors. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices. Returns ------- energy: Total energy per site. """ N = spins.shape[0] energy = 0.0 for i in range(N): inxt = idx_next[i] for j in range(N): jnxt = idx_next[j] # Sum unique pairs (right and down) dot_right = spins[i, j, 0] * spins[i, jnxt, 0] + spins[i, j, 1] * spins[i, jnxt, 1] dot_down = spins[i, j, 0] * spins[inxt, j, 0] + spins[i, j, 1] * spins[inxt, j, 1] energy -= J * (dot_right + dot_down) return energy / (N * N)
[docs] @njit(cache=True, fastmath=True) def xy_wolff_step_numba( *, spins: np.ndarray, beta: float, J: float, idx_next: np.ndarray, idx_prev: np.ndarray ) -> np.ndarray: """ Perform one Wolff cluster flip on the XY lattice (Wolff-Evertz reflection). A random mirror-plane axis r\u0302 is sampled uniformly from S^1. Each spin projects onto r\u0302 as sigma_i = s_i \u00b7 r\u0302. Bonds between neighbouring sites with aligned projections (sigma_i * sigma_j > 0) are activated with probability ``P_add = 1 - exp(-2 beta J sigma_i sigma_j)``. All cluster spins are then reflected through the plane perpendicular to r\u0302: ``s -> s - 2 (s \u00b7 r\u0302) r\u0302``, which preserves unit length exactly and satisfies detailed balance for the pure exchange term. One call constitutes one cluster sweep. ``parallel=True`` is silently ignored. Parameters ---------- spins: (N, N, 2) array of unit vectors. beta: Inverse temperature 1/kT. J: Coupling constant. idx_next: Pre-calculated next-neighbor indices (PBC). idx_prev: Pre-calculated previous-neighbor indices (PBC). Returns ------- Updated spins array. """ N = spins.shape[0] # Random mirror-plane axis r\u0302 = (cos phi, sin phi) phi = np.random.uniform(0.0, 2.0 * np.pi) rx = np.cos(phi) ry = np.sin(phi) # Random seed site si = np.random.randint(0, N) sj = np.random.randint(0, N) # Pre-allocate cluster membership mask and DFS stack in_cluster = np.zeros((N, N), dtype=np.bool_) stack = np.empty(N * N, dtype=np.int64) in_cluster[si, sj] = True stack[0] = si * N + sj stack_top = 1 while stack_top > 0: stack_top -= 1 flat = stack[stack_top] ci = flat // N cj = flat % N proj_c = spins[ci, cj, 0] * rx + spins[ci, cj, 1] * ry inxt = idx_next[ci] iprv = idx_prev[ci] jnxt = idx_next[cj] jprv = idx_prev[cj] # North if not in_cluster[iprv, cj]: proj_n = spins[iprv, cj, 0] * rx + spins[iprv, cj, 1] * ry prod = proj_c * proj_n if prod > 0.0: p_add = 1.0 - np.exp(-2.0 * beta * J * prod) if np.random.random() < p_add: in_cluster[iprv, cj] = True stack[stack_top] = iprv * N + cj stack_top += 1 # South if not in_cluster[inxt, cj]: proj_n = spins[inxt, cj, 0] * rx + spins[inxt, cj, 1] * ry prod = proj_c * proj_n if prod > 0.0: p_add = 1.0 - np.exp(-2.0 * beta * J * prod) if np.random.random() < p_add: in_cluster[inxt, cj] = True stack[stack_top] = inxt * N + cj stack_top += 1 # West if not in_cluster[ci, jprv]: proj_n = spins[ci, jprv, 0] * rx + spins[ci, jprv, 1] * ry prod = proj_c * proj_n if prod > 0.0: p_add = 1.0 - np.exp(-2.0 * beta * J * prod) if np.random.random() < p_add: in_cluster[ci, jprv] = True stack[stack_top] = ci * N + jprv stack_top += 1 # East if not in_cluster[ci, jnxt]: proj_n = spins[ci, jnxt, 0] * rx + spins[ci, jnxt, 1] * ry prod = proj_c * proj_n if prod > 0.0: p_add = 1.0 - np.exp(-2.0 * beta * J * prod) if np.random.random() < p_add: in_cluster[ci, jnxt] = True stack[stack_top] = ci * N + jnxt stack_top += 1 # Reflect all cluster spins through the plane perpendicular to r\u0302 for i in range(N): for j in range(N): if in_cluster[i, j]: proj = spins[i, j, 0] * rx + spins[i, j, 1] * ry spins[i, j, 0] -= 2.0 * proj * rx spins[i, j, 1] -= 2.0 * proj * ry return spins
[docs] class XYSimulation(MonteCarloSimulation): """ Simulation of the 2D XY model on a square lattice. """ _VALID_UPDATES: frozenset = frozenset({'checkerboard', 'random', 'wolff'}) def __init__( self, *, size: int, temp: float, J: float = 1.0, update: str = 'checkerboard', init_state: str = 'random', parallel: bool = False, seed: int | None = None, ): """ Initialize the XY simulation. Parameters ---------- size: Linear dimension L of the L x L lattice. temp: Temperature T. J: Coupling constant (default 1.0). update: Update scheme - ``'checkerboard'`` (default, faster), ``'random'`` (random sequential Metropolis, physical stochastic dynamics for kinetics studies), or ``'wolff'`` (Wolff-Evertz cluster algorithm, efficient near the BKT transition). init_state: Initial spin configuration: ``'random'`` (default) or ``'ordered'``. parallel: Whether to use parallelized Numba kernels (only for checkerboard). seed: Optional random seed for reproducibility. Raises ------ ValueError: If ``update`` is not one of the recognised schemes. """ super().__init__(size=size, temp=temp, init_state=init_state, seed=seed) if update not in self._VALID_UPDATES: valid_opts = sorted(self._VALID_UPDATES) raise ValueError(f'Unknown update scheme {update!r}. Valid options: {valid_opts}') self.J = J self.update = update self.parallel = parallel if self.init_state == 'ordered': # Ordered state: all spins at angle 0 -> (1, 0) self.spins = np.zeros((size, size, 2), dtype=float) self.spins[..., 0] = 1.0 else: # Initialize random spins as 2D unit vectors angles = self.rng.uniform(0, 2 * np.pi, size=(size, size)) self.spins = np.stack([np.cos(angles), np.sin(angles)], axis=-1)
[docs] def step(self) -> None: """Perform one Monte Carlo step using Numba.""" if self.spins is not None: if self.seed is not None: from .simulation_base import _seed_numba _seed_numba(seed=self.seed + self.steps) if self.update == 'random': self.spins = xy_step_random_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) elif self.update == 'wolff': self.spins = xy_wolff_step_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) elif self.parallel: self.spins = xy_step_parallel_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) else: self.spins = xy_step_numba( spins=self.spins, beta=self.beta, J=self.J, idx_next=self.idx_next, idx_prev=self.idx_prev, ) self.steps += 1
def _get_magnetization(self) -> float: """Calculate magnetization magnitude per spin.""" if self.spins is not None: total_spin = np.sum(self.spins, axis=(0, 1)) return float(np.linalg.norm(total_spin) / (self.size**2)) return 0.0 def _get_energy(self) -> float: """Calculate energy per spin.""" if self.spins is not None: return float(xy_energy_numba(spins=self.spins, J=self.J, idx_next=self.idx_next)) return 0.0 def _calculate_vorticity(self) -> np.ndarray: """Calculate the vorticity (winding number) of each plaquette.""" if self.spins is not None: return np.asarray(calculate_vorticity_numba(spins=self.spins, idx_next=self.idx_next)) return np.array([]) def _get_vortex_density(self) -> float: """Calculate vortex density n_v, the fraction of plaquettes with non-zero winding.""" if self.spins is not None: return float(calculate_vortex_density_numba(spins=self.spins, idx_next=self.idx_next)) return 0.0 def _get_helicity_data(self) -> tuple[float, float]: """Calculate sum of cos and sin of angle differences in x-direction.""" if self.spins is not None: cos_sum, sin_sum = get_helicity_data_numba(spins=self.spins, idx_next=self.idx_next) return float(cos_sum), float(sin_sum) return 0.0, 0.0 def _get_structure_factor_squared_unshifted(self) -> np.ndarray: """Calculate the unshifted squared magnitude of the Fourier transform.""" if self.spins is not None: sx = self.spins[..., 0] sy = self.spins[..., 1] Sk_x = np.fft.fft2(sx) Sk_y = np.fft.fft2(sy) return np.asarray(np.abs(Sk_x) ** 2 + np.abs(Sk_y) ** 2) return np.array([])
[docs] def main() -> None: """CLI entry point for XY simulation example.""" import argparse import logging from utils.system_helpers import setup_logging parser = argparse.ArgumentParser(description='XY Model Quick Example') parser.add_argument('--size', type=int, default=128, help='Lattice size L') parser.add_argument('--temp', type=float, default=0.5, help='Temperature T') parser.add_argument('--steps', type=int, default=500, help='MC steps') parser.add_argument('--seed', type=int, default=None, help='Random seed') parser.add_argument('--parallel', action='store_true', help='Use parallel kernels') parser.add_argument( '--update', type=str, default='checkerboard', choices=['checkerboard', 'random', 'wolff'], help='Update scheme (default: checkerboard)', ) parser.add_argument('--verbose', action='store_true', help='Enable verbose logging') args = parser.parse_args() log_level = logging.DEBUG if args.verbose else logging.INFO logger = setup_logging(level=log_level) logger.info( f'Initializing XY Model (L={args.size}, T={args.temp}, ' f'update={args.update}, parallel={args.parallel})...' ) sim = XYSimulation( size=args.size, temp=args.temp, seed=args.seed, parallel=args.parallel, update=args.update ) logger.info(f'Running for {args.steps} steps...') sim.run(n_steps=args.steps) # Plotting fig, axes = plt.subplots(2, 2, figsize=(12, 11)) fig.suptitle(f'2D XY Model - $L={args.size}, T={args.temp}$', fontsize=14) ax1, ax2, ax3, ax4 = axes.flatten() # Final Configuration (Phase angle) if sim.spins is not None: angles = np.arctan2(sim.spins[..., 1], sim.spins[..., 0]) im1 = ax1.imshow(angles, cmap='hsv', interpolation='none', vmin=-np.pi, vmax=np.pi) ax1.set_title('Final Spin Phase') ax1.axis('off') fig.colorbar(im1, ax=ax1, label='Phase (rad)', shrink=0.8) # Vorticity map vort = sim._calculate_vorticity() im2 = ax2.imshow(vort, cmap='bwr', interpolation='none', vmin=-1, vmax=1) ax2.set_title(f'Vorticity (Total: {int(np.sum(np.abs(vort)))})') ax2.axis('off') fig.colorbar(im2, ax=ax2, ticks=[-1, 0, 1], label='Winding No.', shrink=0.8) # Spin-Spin Correlation Function r, G_r = sim._calculate_correlation_function() ax3.plot(r[1:], G_r[1:], 'o-', markersize=3) ax3.set_title('Spin-Spin Correlation G(r)') ax3.set_xlabel('Distance r') ax3.set_ylabel('G(r)') ax3.grid(True, alpha=0.3) ax3.set_xscale('log') # Structure Factor (Radial) from utils.physics_helpers import radial_average_sk if sim.spins is not None: k, sk = radial_average_sk(spins=sim.spins) ax4.loglog(k[1:], sk[1:], 'o-', markersize=3, color='tab:green') ax4.set_title('Structure Factor S(k)') ax4.set_xlabel('|k|') ax4.set_ylabel('S(k)') ax4.grid(True, alpha=0.3) plt.tight_layout() output_dir = 'results' os.makedirs(output_dir, exist_ok=True) output_file = os.path.join(output_dir, 'xy_example.png') plt.savefig(output_file) logger.info(f'Simulation finished. Plot saved to {output_file}')
if __name__ == '__main__': main()