Source code for NEDAS.models.qg.python.numerics

"""
Numerical routines: tridiagonal solvers, Robert-filtered leapfrog timestepper,
random forcing, and ring-integral diagnostics.

Translates Fortran numerics_lib.f90.
"""

import numpy as np


# ---------------------------------------------------------------------------
# Vectorised tridiagonal solver (Thomas algorithm)
# ---------------------------------------------------------------------------

[docs] def tridiag_vec(f_in, sub, diag, sup): """Solve a tridiagonal system for many RHS vectors simultaneously. Solves A * x = f where A has: sub : sub-diagonal, shape (nz-1,) or (nmask, nz-1) diag : diagonal, shape (nz,) or (nmask, nz) sup : super-diag, shape (nz-1,) or (nmask, nz-1) f_in : RHS, shape (nmask, nz) Returns x of shape (nmask, nz). When sub/diag/sup are 1-D (shared across wavenumbers), they are broadcast. When diag is 2-D the diagonal differs per wavenumber (PV inversion case). Follows the Thomas algorithm from Fortran tridiag in numerics_lib.f90. """ nmask, nv = f_in.shape sub_ = np.broadcast_to(sub, (nmask, nv - 1)).copy() diag_ = np.broadcast_to(diag, (nmask, nv)).copy() sup_ = np.broadcast_to(sup, (nmask, nv - 1)).copy() f = f_in.copy() gamma = np.zeros_like(f) bet = diag_[:, 0].copy() x = np.zeros_like(f) x[:, 0] = f[:, 0] / bet for n in range(1, nv): gamma[:, n] = sup_[:, n - 1] / bet bet = diag_[:, n] - sub_[:, n - 1] * gamma[:, n] x[:, n] = (f[:, n] - sub_[:, n - 1] * x[:, n - 1]) / bet for n in range(nv - 2, -1, -1): x[:, n] -= gamma[:, n + 1] * x[:, n + 1] return x
[docs] def tridiag_cyc_vec(f_in, sub, diag, sup, corner_lo, corner_hi): """Cyclic tridiagonal solver (periodic vertical BC). corner_lo : sub-diagonal wrap value op(1, -1) -> mat(1, nz) corner_hi : super-diagonal wrap value op(nz, 1) -> mat(nz, 1) Shape conventions same as tridiag_vec. Follows Fortran tridiag_cyc in numerics_lib.f90 (Sherman-Morrison). """ nmask, nv = f_in.shape alpha = corner_hi beta = corner_lo gamma_val = -diag[0] if np.ndim(diag) == 1 else -diag[:, 0] diag_mod = diag.copy() if np.ndim(diag) > 1 else np.tile(diag, (nmask, 1)) diag_mod[:, 0] -= gamma_val diag_mod[:, nv-1] -= alpha * beta / gamma_val x = tridiag_vec(f_in, sub, diag_mod, sup) u = np.zeros_like(f_in) u[:, 0] = gamma_val u[:, nv-1] = alpha z = tridiag_vec(u, sub, diag_mod, sup) factr = (x[:, 0] + beta * x[:, nv-1] / gamma_val) / \ (1.0 + z[:, 0] + beta * z[:, nv-1] / gamma_val) return x - factr[:, np.newaxis] * z
# --------------------------------------------------------------------------- # Leapfrog timestepper with Robert filter # ---------------------------------------------------------------------------
[docs] def march(f_now, f_o, rhs, dt, rob, calls): """Advance prognostic field one step using leapfrog / Robert filter. calls=0 : Euler half-step (initialisation) calls=1 : first leapfrog calls>=2 : leapfrog + Robert filter on f_o Returns (f_new, f_o_new, calls_new). Arrays can have any shape; operates element-wise. Translates Fortran March2/3 from numerics_lib.f90. """ if calls == 0: f_new = f_now + 0.5 * dt * rhs f_o_new = f_now.copy() calls_new = 1 elif calls == 1: f_new = f_o + dt * rhs f_o_new = f_o calls_new = 2 else: f_new = f_o + 2.0 * dt * rhs f_o_new = (1.0 - 2.0 * rob) * f_now + rob * (f_o + f_new) calls_new = 3 return f_new, f_o_new, calls_new
# --------------------------------------------------------------------------- # Portable pseudo-random number generator # --------------------------------------------------------------------------- _RAN_IFF = 0 _RAN_IY = 0 _RAN_IR = np.zeros(97, dtype=np.int64) _RAN_IDUM = -7 _RAN_M = 714025 _RAN_IA = 1366 _RAN_IC = 150889 _RAN_RM = 1.4005112e-6
[docs] def ran_reset(idum): """Seed the portable RNG (negative idum seeds a fresh sequence).""" global _RAN_IFF, _RAN_IY, _RAN_IR, _RAN_IDUM _RAN_IDUM = int(idum) _RAN_IFF = 0 # force re-initialisation on next call to ran()
[docs] def ran(nx, ny): """Return (nx, ny) pseudo-random floats in [0, 1). Replicates the Fortran Ran function in numerics_lib.f90 exactly so that forcing sequences can be reproduced across languages. """ global _RAN_IFF, _RAN_IY, _RAN_IR, _RAN_IDUM M = _RAN_M IA = _RAN_IA IC = _RAN_IC RM = _RAN_RM irsize = 97 if _RAN_IDUM < 0 or _RAN_IFF == 0: _RAN_IFF = 1 _RAN_IDUM = int(np.mod(IC - _RAN_IDUM, M)) for j in range(irsize): _RAN_IDUM = int(np.mod(IA * _RAN_IDUM + IC, M)) _RAN_IR[j] = _RAN_IDUM _RAN_IDUM = int(np.mod(IA * _RAN_IDUM + IC, M)) _RAN_IY = _RAN_IDUM result = np.empty((nx, ny), dtype=np.float32) for y in range(ny): for x in range(nx): j = 1 + (irsize * _RAN_IY) // M j = max(0, min(irsize - 1, j - 1)) # 0-based, clamp _RAN_IY = int(_RAN_IR[j]) result[x, y] = _RAN_IY * RM _RAN_IDUM = int(np.mod(IA * _RAN_IDUM + IC, M)) _RAN_IR[j] = _RAN_IDUM return result
# --------------------------------------------------------------------------- # Ring integral (for spectral diagnostics) # ---------------------------------------------------------------------------
[docs] def ring_integral(f, kxv, kyv, nf): """Integrate field f over rings of constant ``|k|`` (wavenumber magnitude). f : (..., nky, nkx) or (nky, nkx) Returns (..., nf) or (nf,) ring-averaged values. """ kxv_i = np.round(kxv).astype(int) kyv_i = np.round(kyv).astype(int) radius = np.floor( np.sqrt(kxv_i[np.newaxis, :]**2 + kyv_i[:, np.newaxis]**2) + 0.5 ).astype(int) batch = f.shape[:-2] result = np.zeros(batch + (nf,), dtype=f.dtype) for rad in range(1, nf + 1): mask = radius == rad if batch: result[..., rad - 1] = np.sum(f[..., mask], axis=-1) else: result[..., rad - 1] = np.sum(f[mask]) return result