Source code for NEDAS.utils.multiscale

import numpy as np
from NEDAS.grid import Grid
from NEDAS.utils.fft_lib import fft2, ifft2, get_wn

"""functions to perform scale decomposition, through bandpass filtering
on spatial data (fields), can also use gcm_filter to achieve this.
"""
[docs] def lowpass_response(k2d, k1, k2): """ Low-pass spectral response function Input - k2d: array, 2d wavenumber - k1, k2: float, wavenumbers defining the transition zone (from 1 to 0 in response) Return - r: array same dimension as k2d, the response function """ r = np.zeros(k2d.shape) r[np.where(k2d<k1)] = 1.0 r[np.where(k2d>k2)] = 0.0 ind = np.where(np.logical_and(k2d>=k1, k2d<=k2)) # cos-square transition r[ind] = np.cos((k2d[ind] - k1)*(0.5*np.pi/(k2 - k1)))**2 return r
[docs] def get_scale_component_spec_bandpass(grid, fld, character_length, s): assert grid.regular, "get_scale_component_spec_bandpass only works for regular grid" # convert length to wavenumber L = max(grid.Lx, grid.Ly) character_k = L / np.array(character_length) nscale = len(character_k) if nscale == 1: return fld # nothing to be done, return the original field kx, ky = get_wn(fld) k2d = np.hypot(kx, ky) fld_spec = fft2(fld) r = np.ones(fld_spec.shape) if s == 0: r = lowpass_response(k2d, character_k[s], character_k[s+1]) if s == nscale-1: r = 1 - lowpass_response(k2d, character_k[s-1], character_k[s]) if s > 0 and s < nscale-1: r = lowpass_response(k2d, character_k[s], character_k[s+1]) - lowpass_response(k2d, character_k[s-1], character_k[s]) return ifft2(fld_spec * r)
[docs] def convolve(grid, fld, rgrid, response): L = max(grid.Lx, grid.Ly) kx, ky = get_wn(rgrid.x) ny, nx = rgrid.x.shape fld1 = fld.copy() # go through grid points in the field and perform convolution for i in np.ndindex(grid.x.shape): # shift to the grid point i position r = response * np.exp(-2 * np.pi * complex(0,1) * (kx*grid.x[i] + ky*grid.y[i]) / L) # convert to physical space kernel = ifft2(r) * nx * ny / grid.x.size w = rgrid.convert(kernel) # perform convolution fld1[i] = np.nansum(fld * w) return fld1
[docs] def get_scale_component_convolve(grid, fld, character_length, s): # convert length to wavenumber L = max(grid.Lx, grid.Ly) character_k = L / np.array(character_length) nscale = len(character_k) if nscale == 1: # nothing to be done, return the original field return fld # make a regular grid for the kernel function rgrid = Grid.regular_grid(grid.proj, grid.xmin, grid.xmax, grid.ymin, grid.ymax, grid.dx) rgrid.set_destination_grid(grid) kx, ky = get_wn(rgrid.x) ny, nx = rgrid.x.shape k2d = np.hypot(kx, ky) if s == 0: r = lowpass_response(k2d, character_k[s], character_k[s+1]) return convolve(grid, fld, rgrid, r) if s == nscale-1: r = lowpass_response(k2d, character_k[s-1], character_k[s]) return fld - convolve(grid, fld, rgrid, r) if s > 0 and s < nscale-1: r1 = lowpass_response(k2d, character_k[s-1], character_k[s]) r2 = lowpass_response(k2d, character_k[s], character_k[s+1]) return convolve(grid, fld, rgrid, r2) - convolve(grid, fld, rgrid, r1)
[docs] def get_scale_component(grid, fld, character_length, s): """ Get scale component using a bandpass filter in spectral space Input: - grid: Grid object - fld: array, [..., ny, nx], the input field - character_length: list of characteristic length for each scale - s: int, scale index Return: - fld: array, the scale component s of input fld """ flds = fld.copy() if grid.regular: for i in np.ndindex(fld.shape[:-2]): flds[i] = get_scale_component_spec_bandpass(grid, fld[i], character_length, s) else: for i in np.ndindex(fld.shape[:-1]): flds[i] = get_scale_component_convolve(grid, fld[i], character_length, s) return flds
[docs] def get_error_scale_factor(grid, character_length, s): err_scale_fac = np.ones(grid.x.shape) L = max(grid.Lx, grid.Ly) character_k = L / np.array(character_length) nscale = len(character_k) if nscale == 1: return err_scale_fac rgrid = Grid.regular_grid(grid.proj, grid.xmin, grid.xmax, grid.ymin, grid.ymax, grid.dx) rgrid.set_destination_grid(grid) kx, ky = get_wn(rgrid.x) ny, nx = rgrid.x.shape k2d = np.hypot(kx, ky) response = np.ones(k2d.shape) if s == 0: response = lowpass_response(k2d, character_k[s], character_k[s+1]) if s == nscale-1: response = 1 - lowpass_response(k2d, character_k[s-1], character_k[s]) if s > 0 and s < nscale-1: r1 = lowpass_response(k2d, character_k[s-1], character_k[s]) r2 = lowpass_response(k2d, character_k[s], character_k[s+1]) response = r2 - r1 for i in np.ndindex(grid.x.shape): # shift to the grid point i position r = response * np.exp(-2 * np.pi * complex(0,1) * (kx*grid.x[i] + ky*grid.y[i]) / L) # convert to physical space kernel = ifft2(r) * nx * ny / grid.x.size w = rgrid.convert(kernel) err_scale_fac[i] = w[i] return err_scale_fac