"""
Physics-related utility functions for calculating thermodynamic observables and correlations.
"""
from __future__ import annotations
import numpy as np
from scipy.stats import norm
from utils.exceptions import ZeroVarianceAutocorrelationError
from utils.observables import calculate_entropy
DEFAULT_CONFIDENCE_LEVEL = 0.68
UNCERTAINTY_METHOD_BLOCKING = 'blocking'
UNCERTAINTY_METHOD_BOOTSTRAP = 'bootstrap'
UNCERTAINTY_FIELDS = (
'value',
'err',
'ci_low',
'ci_high',
'tau_int',
'n_eff',
'samples',
)
UNCERTAINTY_METADATA_FIELDS = (
'uncertainty_method',
'confidence_level',
'n_seeds',
'bootstrap_resamples',
'nan_or_undefined_count',
)
def _validate_confidence(*, confidence: float) -> None:
if not (0.0 < confidence < 1.0):
raise ValueError(f'confidence must satisfy 0 < confidence < 1, got {confidence}')
def _as_1d_float_array(*, time_series: np.ndarray, name: str) -> np.ndarray:
arr = np.asarray(time_series, dtype=np.float64)
if arr.ndim != 1:
raise ValueError(f'{name} must be 1-D, got shape {arr.shape}')
if arr.size < 2:
raise ValueError(f'{name} must contain at least 2 elements, got {arr.size}')
return arr
def _z_multiplier(*, confidence: float) -> float:
"""Return Gaussian z-score for two-sided confidence interval."""
return float(norm.ppf(0.5 + 0.5 * confidence))
def _blocking_block_sizes(*, n: int, min_block_size: int, max_block_size: int) -> list[int]:
"""Generate increasing block sizes for blocking analysis."""
sizes: list[int] = []
block = max(2, min_block_size)
upper = min(max_block_size, n // 2)
while block <= upper:
n_blocks = n // block
if n_blocks >= 2:
sizes.append(block)
block *= 2
return sizes
def _select_blocking_plateau(*, standard_errors: np.ndarray) -> int:
"""Pick the first stable blocking plateau index."""
if standard_errors.size == 1:
return 0
rel_tol = 0.05
for idx in range(1, standard_errors.size):
prev = float(standard_errors[idx - 1])
cur = float(standard_errors[idx])
if prev == 0.0 and cur == 0.0:
return idx
denom = max(abs(prev), 1e-16)
if abs(cur - prev) / denom <= rel_tol:
return idx
return int(standard_errors.size - 1)
def _safe_n_eff_from_tau(*, n_samples: int, tau_int: float) -> float:
"""Convert integrated autocorrelation time to effective sample size."""
if not np.isfinite(tau_int) or tau_int <= 0.0:
return float('nan')
return float(min(n_samples, max(1.0, n_samples / (2.0 * tau_int))))
def _propagate_entropy_uncertainty_from_cv_errors(
*,
temperatures: np.ndarray,
specific_heat_err: np.ndarray,
) -> np.ndarray:
"""Propagate pointwise specific-heat errors into entropy uncertainty.
Uses independent-error propagation through the trapezoidal integration used
in :func:`calculate_entropy` for S(T).
"""
t = np.asarray(temperatures, dtype=np.float64)
cv_err = np.asarray(specific_heat_err, dtype=np.float64)
if t.ndim != 1:
raise ValueError(f'temperatures must be 1-D, got shape {t.shape}')
if cv_err.shape != t.shape:
raise ValueError(
'specific_heat_err must match temperatures shape, '
f'got {cv_err.shape} and {t.shape}'
)
sort_idx = np.argsort(t)
t_sorted = t[sort_idx]
cv_err_sorted = cv_err[sort_idx]
sigma_f = cv_err_sorted / t_sorted
dt = np.diff(t_sorted)
seg_var = 0.25 * (dt * dt) * (sigma_f[:-1] * sigma_f[:-1] + sigma_f[1:] * sigma_f[1:])
n_t = t_sorted.size
var_sorted = np.zeros(n_t, dtype=np.float64)
running = 0.0
running_finite = True
for i in range(n_t - 2, -1, -1):
if running_finite and np.isfinite(seg_var[i]):
running += float(seg_var[i])
var_sorted[i] = running
else:
running_finite = False
var_sorted[i] = np.nan
err_sorted = np.sqrt(var_sorted)
err = np.empty_like(err_sorted)
err[sort_idx] = err_sorted
return err
[docs]
def summarize_entropy_observable(
*,
temperatures: np.ndarray,
specific_heat_samples: np.ndarray,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
s_ref: float = 0.0,
specific_heat_err: np.ndarray | None = None,
method: str = UNCERTAINTY_METHOD_BOOTSTRAP,
bootstrap_resamples: int = 400,
rng_seed: int = 0,
) -> dict[str, np.ndarray]:
"""Summarize entropy from replicate specific-heat curves.
Each replicate column is converted to an entropy curve via
:func:`calculate_entropy`, then uncertainty is computed either via
replicate spread (blocking-like) or bootstrap over replicate curves.
For a single finite replicate, uncertainty is reported as NaN unless
``specific_heat_err`` is provided, in which case entropy uncertainty is
propagated from pointwise specific-heat errors.
"""
_validate_confidence(confidence=confidence)
if method not in {UNCERTAINTY_METHOD_BLOCKING, UNCERTAINTY_METHOD_BOOTSTRAP}:
raise ValueError(f'unsupported method {method!r}')
if method == UNCERTAINTY_METHOD_BOOTSTRAP and bootstrap_resamples <= 0:
raise ValueError('bootstrap_resamples must be > 0 when method is bootstrap')
temperatures_arr = np.asarray(temperatures, dtype=np.float64)
samples_arr = np.asarray(specific_heat_samples, dtype=np.float64)
if temperatures_arr.ndim != 1:
raise ValueError(f'temperatures must be 1-D, got shape {temperatures_arr.shape}')
if samples_arr.ndim != 2:
raise ValueError(
f'specific_heat_samples must be 2-D, got shape {samples_arr.shape}'
)
if samples_arr.shape[0] != temperatures_arr.size:
raise ValueError(
'specific_heat_samples first dimension must match temperatures size, '
f'got {samples_arr.shape[0]} and {temperatures_arr.size}'
)
if samples_arr.shape[1] < 1:
raise ValueError('specific_heat_samples must contain at least one replicate column')
n_t, n_rep = samples_arr.shape
entropy_samples = np.empty((n_t, n_rep), dtype=np.float64)
for j in range(n_rep):
entropy_samples[:, j] = calculate_entropy(
temperatures=temperatures_arr,
specific_heat=samples_arr[:, j],
s_ref=s_ref,
)
value = np.empty(n_t, dtype=np.float64)
err = np.empty(n_t, dtype=np.float64)
ci_low = np.empty(n_t, dtype=np.float64)
ci_high = np.empty(n_t, dtype=np.float64)
alpha = 0.5 * (1.0 - confidence)
z = _z_multiplier(confidence=confidence)
if n_rep < 2 and specific_heat_err is not None:
value[:] = np.nanmean(entropy_samples, axis=1)
err[:] = _propagate_entropy_uncertainty_from_cv_errors(
temperatures=temperatures_arr,
specific_heat_err=np.asarray(specific_heat_err, dtype=np.float64),
)
ci_low[:] = value - z * err
ci_high[:] = value + z * err
return {
'value': value,
'err': err,
'ci_low': ci_low,
'ci_high': ci_high,
'samples': entropy_samples,
}
if method == UNCERTAINTY_METHOD_BOOTSTRAP:
if n_rep < 2:
value[:] = np.nanmean(entropy_samples, axis=1)
err[:] = np.nan
ci_low[:] = np.nan
ci_high[:] = np.nan
return {
'value': value,
'err': err,
'ci_low': ci_low,
'ci_high': ci_high,
'samples': entropy_samples,
}
rng = np.random.default_rng(rng_seed)
boot_curves = np.empty((n_t, bootstrap_resamples), dtype=np.float64)
for b in range(bootstrap_resamples):
idx = rng.choice(n_rep, size=n_rep, replace=True)
boot_curves[:, b] = np.nanmean(entropy_samples[:, idx], axis=1)
value[:] = np.nanmean(entropy_samples, axis=1)
err[:] = np.nanstd(boot_curves, axis=1, ddof=1)
ci_low[:] = np.nanquantile(boot_curves, alpha, axis=1)
ci_high[:] = np.nanquantile(boot_curves, 1.0 - alpha, axis=1)
return {
'value': value,
'err': err,
'ci_low': ci_low,
'ci_high': ci_high,
'samples': entropy_samples,
}
for i in range(n_t):
row = entropy_samples[i]
finite = row[np.isfinite(row)]
if finite.size == 0:
value[i] = np.nan
err[i] = np.nan
ci_low[i] = np.nan
ci_high[i] = np.nan
continue
mean_i = float(np.mean(finite))
value[i] = mean_i
if finite.size == 1:
err[i] = np.nan
ci_low[i] = np.nan
ci_high[i] = np.nan
continue
stderr_i = float(np.std(finite, ddof=1) / np.sqrt(finite.size))
err[i] = stderr_i
ci_low[i] = float(mean_i - z * stderr_i)
ci_high[i] = float(mean_i + z * stderr_i)
return {
'value': value,
'err': err,
'ci_low': ci_low,
'ci_high': ci_high,
'samples': entropy_samples,
}
[docs]
def summarize_asymmetric_replicate_uncertainty(
*,
samples: np.ndarray,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> dict[str, float]:
"""Summarize 1-D replicate samples with asymmetric percentile intervals."""
_validate_confidence(confidence=confidence)
arr = np.asarray(samples, dtype=np.float64)
if arr.ndim != 1:
raise ValueError(f'samples must be 1-D, got shape {arr.shape}')
if arr.size == 0:
raise ValueError('samples must be non-empty')
finite = arr[np.isfinite(arr)]
if finite.size == 0:
return {
'value': float('nan'),
'ci_low': float('nan'),
'ci_high': float('nan'),
'err': float('nan'),
'err_low': float('nan'),
'err_high': float('nan'),
'samples': 0.0,
'nan_or_undefined_count': float(arr.size),
}
alpha = 0.5 * (1.0 - confidence)
value = float(np.nanmedian(finite))
if finite.size == 1:
return {
'value': value,
'ci_low': float('nan'),
'ci_high': float('nan'),
'err': float('nan'),
'err_low': float('nan'),
'err_high': float('nan'),
'samples': 1.0,
'nan_or_undefined_count': float(arr.size - 1),
}
ci_low = float(np.nanquantile(finite, alpha))
ci_high = float(np.nanquantile(finite, 1.0 - alpha))
err_low = float(max(0.0, value - ci_low))
err_high = float(max(0.0, ci_high - value))
return {
'value': value,
'ci_low': ci_low,
'ci_high': ci_high,
'err': float(max(err_low, err_high)),
'err_low': err_low,
'err_high': err_high,
'samples': float(finite.size),
'nan_or_undefined_count': float(arr.size - finite.size),
}
[docs]
def calculate_autocorr(
*,
time_series: np.ndarray,
max_lag: int | None = None,
window_constant: float = 6.0,
) -> tuple[np.ndarray, float]:
"""
Compute the normalized autocorrelation function and integrated autocorrelation time.
Uses an FFT-based estimator for C(t): zero-pads to 2N, computes the power
spectrum, IFFTs, and normalizes by variance and the number of pairs at each
lag. Applies the Madras-Sokal automatic windowing rule to stop integration
at the lag W where W = ``window_constant`` * tau_int, preventing the estimator
from accumulating noise at large lags.
Parameters
----------
time_series: 1-D array of sequential measurements (e.g. magnetization).
max_lag: Maximum lag to compute. Defaults to ``len(time_series) // 2``.
window_constant: Multiplier for the Madras-Sokal window (default 6.0).
Returns
-------
A tuple ``(C_t, tau_int)`` where ``C_t`` is the normalized autocorrelation
array truncated at the window endpoint, and ``tau_int`` is the integrated
autocorrelation time (scalar).
Raises
------
ValueError: If ``time_series`` has fewer than 3 elements.
ZeroVarianceAutocorrelationError: If ``time_series`` has zero variance.
"""
x = np.asarray(time_series, dtype=np.float64)
N = len(x)
if N < 3:
raise ValueError(f'time_series must have at least 3 elements, got {N}')
variance = float(np.var(x))
if variance == 0.0:
raise ZeroVarianceAutocorrelationError(
'time_series has zero variance; autocorrelation is undefined'
)
if max_lag is None:
max_lag = N // 2
# FFT-based unnormalized autocorrelation via power spectrum
x_centered = x - np.mean(x)
n_fft = 2 * N
fx = np.fft.rfft(x_centered, n=n_fft)
power = fx * np.conj(fx)
acf_raw: np.ndarray = np.fft.irfft(power, n=n_fft)[:N].real
# Normalize: divide by variance and by (N - t) pairs at each lag t
lags = np.arange(N, dtype=np.float64)
norm = variance * (N - lags)
C_t_full: np.ndarray = acf_raw / norm
# Madras-Sokal automatic window: integrate until window W = window_constant * tau_int
tau_int = 0.5
window_end = max_lag
for t in range(1, max_lag + 1):
tau_int += float(C_t_full[t])
if t >= window_constant * tau_int:
window_end = t
break
C_t: np.ndarray = C_t_full[:window_end + 1]
return C_t, tau_int
[docs]
def estimate_effective_sample_size(
*,
time_series: np.ndarray,
tau_int: float | None = None,
) -> float:
"""Estimate effective sample size from integrated autocorrelation time."""
arr = _as_1d_float_array(time_series=time_series, name='time_series')
if tau_int is None:
_, tau_est = calculate_autocorr(time_series=arr)
else:
if tau_int <= 0.0:
raise ValueError(f'tau_int must be positive when provided, got {tau_int}')
tau_est = float(tau_int)
return _safe_n_eff_from_tau(n_samples=arr.size, tau_int=tau_est)
[docs]
def blocking_error(
*,
time_series: np.ndarray,
min_block_size: int = 2,
max_block_size: int | None = None,
) -> dict[str, float]:
"""Estimate standard error using standard blocking/binning analysis."""
arr = _as_1d_float_array(time_series=time_series, name='time_series')
if min_block_size < 2:
raise ValueError(f'min_block_size must be >= 2, got {min_block_size}')
if max_block_size is None:
max_block_size = arr.size // 2
# Handle all-NaN series gracefully (e.g. from failed measurements)
if np.isnan(arr).all():
return {
'stderr': float('nan'),
'stderr_naive': float('nan'),
'block_size': float('nan'),
'n_blocks': 0.0,
'tau_int_from_blocking': float('nan'),
}
if max_block_size < min_block_size:
raise ValueError(
f'max_block_size must be >= min_block_size, got {max_block_size} < {min_block_size}'
)
std = float(np.std(arr, ddof=1))
naive_stderr = std / np.sqrt(arr.size)
if std == 0.0:
return {
'stderr': 0.0,
'stderr_naive': 0.0,
'block_size': float(min_block_size),
'n_blocks': float(arr.size // min_block_size),
'tau_int_from_blocking': float('nan'),
}
block_sizes = _blocking_block_sizes(
n=arr.size,
min_block_size=min_block_size,
max_block_size=max_block_size,
)
if not block_sizes:
return {
'stderr': float(naive_stderr),
'stderr_naive': float(naive_stderr),
'block_size': 1.0,
'n_blocks': float(arr.size),
'tau_int_from_blocking': 0.5,
}
stderr_values: list[float] = []
n_blocks_values: list[float] = []
for block in block_sizes:
n_blocks = arr.size // block
trimmed = arr[:n_blocks * block]
block_means = trimmed.reshape(n_blocks, block).mean(axis=1)
stderr = float(np.std(block_means, ddof=1) / np.sqrt(n_blocks))
stderr_values.append(stderr)
n_blocks_values.append(float(n_blocks))
stderr_arr = np.asarray(stderr_values, dtype=np.float64)
selected_idx = _select_blocking_plateau(standard_errors=stderr_arr)
selected_stderr = float(stderr_arr[selected_idx])
tau_block = float('nan')
if naive_stderr > 0.0:
tau_block = 0.5 * (selected_stderr / naive_stderr) ** 2
return {
'stderr': selected_stderr,
'stderr_naive': float(naive_stderr),
'block_size': float(block_sizes[selected_idx]),
'n_blocks': float(n_blocks_values[selected_idx]),
'tau_int_from_blocking': tau_block,
}
[docs]
def summarize_primary_observable(
*,
time_series: np.ndarray,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> dict[str, float]:
"""Summarize a primary observable with blocking-based uncertainty."""
_validate_confidence(confidence=confidence)
arr = _as_1d_float_array(time_series=time_series, name='time_series')
value = float(np.mean(arr))
block = blocking_error(time_series=arr)
err = float(block['stderr'])
z = _z_multiplier(confidence=confidence)
try:
_, tau_int = calculate_autocorr(time_series=arr)
except ZeroVarianceAutocorrelationError:
tau_int = float('nan')
n_eff = _safe_n_eff_from_tau(n_samples=arr.size, tau_int=tau_int)
return {
'value': value,
'err': err,
'ci_low': float(value - z * err),
'ci_high': float(value + z * err),
'tau_int': float(tau_int),
'n_eff': n_eff,
'samples': float(arr.size),
}
def _derived_point_estimate(
*,
series: np.ndarray,
temperature: float,
L: int,
observable: str,
) -> float:
"""Compute derived thermodynamic point estimates from a time series."""
N = float(L * L)
variance = float(np.var(series))
if observable == 'chi':
return float(N * variance / temperature)
if observable == 'cv':
return float(N * variance / (temperature**2))
raise ValueError(f"observable must be one of 'chi' or 'cv', got {observable!r}")
[docs]
def summarize_derived_observable(
*,
magnetization_series: np.ndarray | None = None,
energy_series: np.ndarray | None = None,
temperature: float,
L: int,
observable: str,
method: str = UNCERTAINTY_METHOD_BLOCKING,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
bootstrap_resamples: int = 0,
) -> dict[str, float]:
"""Summarize derived observables (chi, Cv) with uncertainty estimates."""
_validate_confidence(confidence=confidence)
if temperature <= 0.0:
raise ValueError(f'temperature must be positive, got {temperature}')
if not isinstance(L, (int, np.integer)) or L < 1:
raise ValueError(f'L must be a positive integer, got {L!r}')
if method not in {UNCERTAINTY_METHOD_BLOCKING, UNCERTAINTY_METHOD_BOOTSTRAP}:
raise ValueError(f'unsupported method {method!r}')
if observable == 'chi':
if magnetization_series is None:
raise ValueError("magnetization_series is required for observable='chi'")
base = _as_1d_float_array(time_series=magnetization_series, name='magnetization_series')
elif observable == 'cv':
if energy_series is None:
raise ValueError("energy_series is required for observable='cv'")
base = _as_1d_float_array(time_series=energy_series, name='energy_series')
else:
raise ValueError(f"observable must be one of 'chi' or 'cv', got {observable!r}")
value = _derived_point_estimate(
series=base,
temperature=temperature,
L=L,
observable=observable,
)
if np.std(base, ddof=1) == 0.0:
return {
'value': value,
'err': 0.0,
'ci_low': value,
'ci_high': value,
'tau_int': float('nan'),
'n_eff': float('nan'),
'samples': float(base.size),
}
block = blocking_error(time_series=base)
block_size = int(block['block_size'])
n_blocks = base.size // block_size
trimmed = base[:n_blocks * block_size]
block_reshaped = trimmed.reshape(n_blocks, block_size)
per_block = np.array(
[
_derived_point_estimate(
series=block_reshaped[idx],
temperature=temperature,
L=L,
observable=observable,
)
for idx in range(n_blocks)
],
dtype=np.float64,
)
if n_blocks < 2:
err = float('nan')
ci_low = float('nan')
ci_high = float('nan')
elif method == UNCERTAINTY_METHOD_BLOCKING:
err = float(np.std(per_block, ddof=1) / np.sqrt(n_blocks))
z = _z_multiplier(confidence=confidence)
ci_low = float(value - z * err)
ci_high = float(value + z * err)
else:
if bootstrap_resamples <= 0:
raise ValueError('bootstrap_resamples must be > 0 when method is bootstrap')
rng = np.random.default_rng(0)
boot = np.empty(bootstrap_resamples, dtype=np.float64)
for i in range(bootstrap_resamples):
sample = rng.choice(per_block, size=n_blocks, replace=True)
boot[i] = float(np.mean(sample))
alpha = 0.5 * (1.0 - confidence)
ci_low = float(np.quantile(boot, alpha))
ci_high = float(np.quantile(boot, 1.0 - alpha))
err = float(np.std(boot, ddof=1))
try:
_, tau_int = calculate_autocorr(time_series=base)
except ZeroVarianceAutocorrelationError:
tau_int = float('nan')
n_eff = _safe_n_eff_from_tau(n_samples=base.size, tau_int=tau_int)
return {
'value': value,
'err': err,
'ci_low': ci_low,
'ci_high': ci_high,
'tau_int': float(tau_int),
'n_eff': n_eff,
'samples': float(base.size),
}
[docs]
def summarize_replicate_samples(
*,
samples: np.ndarray,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> dict[str, np.ndarray | float]:
"""Summarize replicate samples using NaN-safe median and percentile bands."""
_validate_confidence(confidence=confidence)
arr = np.asarray(samples, dtype=np.float64)
if arr.size == 0:
raise ValueError('samples must be non-empty')
alpha = 0.5 * (1.0 - confidence)
if arr.ndim == 1:
value = float(np.nanmedian(arr))
ci_low = float(np.nanquantile(arr, alpha))
ci_high = float(np.nanquantile(arr, 1.0 - alpha))
return {
'value': value,
'ci_low': ci_low,
'ci_high': ci_high,
'samples': float(arr.size),
'nan_or_undefined_count': float(np.isnan(arr).sum()),
}
value_arr = np.nanmedian(arr, axis=1)
ci_low_arr = np.nanquantile(arr, alpha, axis=1)
ci_high_arr = np.nanquantile(arr, 1.0 - alpha, axis=1)
return {
'value': value_arr,
'ci_low': ci_low_arr,
'ci_high': ci_high_arr,
'samples': float(arr.shape[1]),
'nan_or_undefined_count': float(np.isnan(arr).sum()),
}
[docs]
def summarize_seed_ensemble(
*,
values: np.ndarray,
within_seed_errors: np.ndarray,
confidence: float = DEFAULT_CONFIDENCE_LEVEL,
) -> dict[str, float]:
"""Aggregate per-seed estimates into one mean with hierarchical uncertainty.
The total variance combines between-seed and within-seed components:
Var(mean) = Var_between / n_seeds + mean(Var_within) / n_seeds.
"""
_validate_confidence(confidence=confidence)
values_arr = np.asarray(values, dtype=np.float64)
errs_arr = np.asarray(within_seed_errors, dtype=np.float64)
if values_arr.ndim != 1:
raise ValueError(f'values must be 1-D, got shape {values_arr.shape}')
if errs_arr.ndim != 1:
raise ValueError(f'within_seed_errors must be 1-D, got shape {errs_arr.shape}')
if values_arr.size != errs_arr.size:
raise ValueError(
'values and within_seed_errors must have matching lengths, '
f'got {values_arr.size} and {errs_arr.size}'
)
if values_arr.size == 0:
raise ValueError('values must be non-empty')
finite_values = np.isfinite(values_arr)
if not np.any(finite_values):
return {
'value': float('nan'),
'err': float('nan'),
'ci_low': float('nan'),
'ci_high': float('nan'),
'between_seed_component': float('nan'),
'within_seed_component': float('nan'),
'samples': 0.0,
'nan_or_undefined_count': float(values_arr.size),
}
v = values_arr[finite_values]
e = errs_arr[finite_values]
n = v.size
value = float(np.mean(v))
between_component = 0.0
if n > 1:
between_component = float(np.var(v, ddof=1) / n)
finite_errs = np.isfinite(e)
within_component = 0.0
if np.any(finite_errs):
within_component = float(np.mean(e[finite_errs] ** 2) / n)
total_err = float(np.sqrt(max(0.0, between_component + within_component)))
z = _z_multiplier(confidence=confidence)
return {
'value': value,
'err': total_err,
'ci_low': float(value - z * total_err),
'ci_high': float(value + z * total_err),
'between_seed_component': float(np.sqrt(max(0.0, between_component))),
'within_seed_component': float(np.sqrt(max(0.0, within_component))),
'samples': float(n),
'nan_or_undefined_count': float(values_arr.size - n),
}
[docs]
def power_fit(
*, t_arr: np.ndarray, y_arr: np.ndarray, mask: np.ndarray
) -> tuple[float, float] | tuple[None, None]:
"""
Fit a power law y = prefactor * t^exponent via log-log linear regression.
Parameters
----------
t_arr: Independent variable array (e.g., time or length scale).
y_arr: Dependent variable array; only positive values are used in the fit.
mask: Boolean mask selecting the subset of points to include.
Returns
-------
exponent: Fitted power-law exponent, or None if fewer than 3 valid points.
prefactor: Fitted prefactor, or None if fewer than 3 valid points.
"""
valid = mask & (y_arr > 0)
if valid.sum() < 3:
return None, None
coeffs = np.polyfit(np.log(t_arr[valid]), np.log(y_arr[valid]), 1)
return float(coeffs[0]), float(np.exp(coeffs[1]))