Source code for NEDAS.assim_tools.updators.alignment

import os
import numpy as np
#open cv
from NEDAS.core import Context, Updator

[docs] class AlignmentUpdator(Updator): """Updator class with alignment technique""" displace = {}
[docs] def compute_increment(self, c: Context): """ Alignment technique: compute optical flows from the pair of prior/posterior state variable field """ if c.config.alignment is None: c.config.alignment = {} c.print_1p("Compute alignment based on analysis increment of '"+c.config.alignment['variable']+"'...\n") for rec_id in c.state.rec_list[c.pid_rec]: rec = c.state.info.fields[rec_id].asdict() model = c.models[rec['model_src']] if rec['name'] != c.config.alignment['variable']: continue for mem_id in c.mem_list[c.pid_mem]: # compute displacement on analysis grid fld_prior = c.state.fields_prior[mem_id, rec_id] fld_post = c.state.fields_post[mem_id, rec_id] displace = optical_flow(c.grid, fld_prior, fld_post, **c.config.alignment) self.displace[mem_id, rec['k']] = displace #np.save(os.path.join(c.io.analysis_dir, f"displace.m{mem_id}.k{rec['k']}.npy"), displace) # the model class offer a method to update grid (a lagrangian approach) # so displace increment will be applied directly to the grid elements if hasattr(model, 'displace'): # convert the displacement from analysis grid to model grid c.io.call_method(c, 'current', model.read_grid, member=mem_id, **rec) c.grid.set_destination_grid(model.grid) displace_m = c.grid.convert(displace, is_vector=True, method='linear') # apply the displacement c.io.call_method(c, 'current', getattr(model, 'displace'), displace_m[0,...], displace_m[1,...], member=mem_id, **rec) c.comm.Barrier()
[docs] def update_files(self, c, mem_id, rec_id): """ Alignment technique, use the displace increment to adjust the model grid to precondition all the analysis variables for next assimilation step See more details in Ying 2019 """ rec = c.state.info.fields[rec_id].asdict() model = c.models[rec['model_src']] fld_prior = c.state.fields_prior[mem_id, rec_id] fld_post = c.state.fields_post[mem_id, rec_id] # get model state variable prior var_prior = c.io.call_method(c, 'current', model.read_var, member=mem_id, **rec) c.grid.set_destination_grid(model.grid) if rec['is_vector']: fld_shape = var_prior.shape[1:] else: fld_shape = var_prior.shape # read the corresponding displacement and convert to model grid #displace = np.load(os.path.join(state.analysis_dir, f"displace.m{mem_id}.k{rec['k']}.npy")) displace = self.displace[mem_id, rec['k']] # warp the prior field with displacement fld_prior_warp = fld_prior.copy() u = displace[0,...]/c.grid.dx v = displace[1,...]/c.grid.dx for ind in np.ndindex(fld_prior.shape[:-2]): fld_prior_warp[ind] = warp(fld_prior[ind], -u, -v) # residual increments not explained by the displacement res_incr = fld_post - fld_prior_warp if hasattr(model, 'displace'): # apply the residual increment res_incr_m = c.grid.convert(res_incr, is_vector=rec['is_vector'], method='linear') if fld_shape == model.grid.x.shape: var_post = var_prior + res_incr_m elif fld_shape == model.grid.x_elem.shape: var_post = var_prior + np.mean(res_incr_m[...,model.grid.tri.triangles], axis=-1) else: raise RuntimeError(f"mismatch in field prior {var_prior.shape} with residual increment {res_incr_m.shape}") else: new_var = c.grid.convert(fld_post, is_vector=rec['is_vector'], method='linear') if fld_shape == model.grid.x.shape: var_post = new_var elif fld_shape == model.grid.x_elem.shape: var_post = np.mean(new_var[...,model.grid.tri.triangles], axis=-1) else: raise RuntimeError(f"mismatch in field prior {var_prior.shape} with posterior {new_var.shape}") #write the posterior variable to restart file ind = np.where(np.isnan(var_post)) var_post[ind] = var_prior[ind] #if np.isnan(var_post).any(): # raise ValueError('nan detected in var_post') c.io.call_method(c, 'current', model.write_var, var_post, member=mem_id, comm=c.comm, **rec)
[docs] def optical_flow(grid, fld1, fld2, nlevel=5, niter_max=100, smoothness_weight=1, **kwargs): ni = int(2**np.ceil(np.log(np.max(fld1.shape))/np.log(2))) x1 = np.full((ni,ni), np.nan) x2 = np.full((ni,ni), np.nan) x1[0:grid.ny, 0:grid.nx] = fld1.copy() x2[0:grid.ny, 0:grid.nx] = fld2.copy() mask = np.logical_or(np.isnan(x1), (np.abs(x2-x1)<0.00001)) x1[mask] = 0 x2[mask] = 0 w = smoothness_weight ni, nj = x1.shape # normalize field so that w can be fixed xmax = np.max(x1[:, :]); xmin = np.min(x1[:, :]) if (xmax>xmin): x1[:, :] = (x1[:, :] - xmin) / (xmax -xmin) x2[:, :] = (x2[:, :] - xmin) / (xmax -xmin) u = np.zeros((ni, nj)) v = np.zeros((ni, nj)) # #pyramid levels for lev in range(nlevel, -1, -1): x1w = warp(x1, -u, -v) x1c = coarsen(x1w, 1, lev) x2c = coarsen(x2, 1, lev) maskc = coarsen_mask(mask, 1, lev) xdx = 0.5*(deriv_x(x1c) + deriv_x(x2c)) xdy = 0.5*(deriv_y(x1c) + deriv_y(x2c)) xdt = x2c - x1c # #compute incremental flow using iterative solver du = np.zeros(xdx.shape) dv = np.zeros(xdx.shape) du1 = np.zeros(xdx.shape) dv1 = np.zeros(xdx.shape) niter = 0 diff = 1e7 while diff > 1e-3 and niter < niter_max: du[0,:] = 0; du[-1,:] = 0; du[:,0] = 0; du[:,-1] = 0 # boundary conditions dv[0,:] = 0; dv[-1,:] = 0; dv[:,0] = 0; dv[:,-1] = 0 du[maskc] = 0 dv[maskc] = 0 ubar = laplacian(du) + du vbar = laplacian(dv) + dv du1 = ubar - xdx*(xdx*ubar + xdy*vbar + xdt)/(w + xdx**2 + xdy**2) dv1 = vbar - xdy*(xdx*ubar + xdy*vbar + xdt)/(w + xdx**2 + xdy**2) diff = np.max(np.abs(du1-du) + np.abs(dv1-dv)) du = du1 dv = dv1 niter += 1 #print(niter, diff) u += sharpen(du*2**(lev-1), lev, 1) v += sharpen(dv*2**(lev-1), lev, 1) disp_u = u[0:grid.ny, 0:grid.nx] * grid.dx disp_v = v[0:grid.ny, 0:grid.nx] * grid.dy return np.array([disp_u, disp_v])
[docs] def warp(x, u, v): xw = x.copy() ni, nj = x.shape ii, jj = np.mgrid[0:ni, 0:nj] xw = interp2d(x, ii+v, jj+u) return xw
[docs] def coarsen_mask(x, lev1, lev2): # only subsample no smoothing, avoid mask growing if lev1 < lev2: for _ in range(lev1, lev2): ni, nj = x.shape x1 = x[0:ni:2, 0:nj:2] x = x1 return x
[docs] def coarsen(x, lev1, lev2): if lev1 < lev2: for _ in range(lev1, lev2): ni, nj = x.shape x1 = 0.25*(x[0:ni:2, 0:nj:2] + x[1:ni:2, 0:nj:2] + x[0:ni:2, 1:nj:2] + x[1:ni:2, 1:nj:2]) x = x1 return x
[docs] def sharpen(x, lev1, lev2): if lev1 > lev2: for _ in range(lev1, lev2, -1): ni, nj = x.shape x1 = np.zeros((ni*2, nj)) x1[0:ni*2:2, :] = x x1[1:ni*2:2, :] = 0.5*(np.roll(x, -1, axis=0) + x) x2 = np.zeros((ni*2, nj*2)) x2[:, 0:nj*2:2] = x1 x2[:, 1:nj*2:2] = 0.5*(np.roll(x1, -1, axis=1) + x1) x = x2 return x
[docs] def interp2d(x, io, jo): ni, nj = x.shape io1 = np.floor(io).astype(int) % ni jo1 = np.floor(jo).astype(int) % nj io2 = np.floor(io+1).astype(int) % ni jo2 = np.floor(jo+1).astype(int) % nj di = io - np.floor(io) dj = jo - np.floor(jo) xo = (1-di)*(1-dj)*x[io1, jo1] + di*(1-dj)*x[io2, jo1] + (1-di)*dj*x[io1, jo2] + di*dj*x[io2, jo2] return xo
[docs] def deriv_y(f): return 0.5*(np.roll(f, -1, axis=0) - np.roll(f, 1, axis=0))
[docs] def deriv_x(f): return 0.5*(np.roll(f, -1, axis=1) - np.roll(f, 1, axis=1))
[docs] def laplacian(f): out = (np.roll(f, -1, axis=1) + np.roll(f, 1, axis=1) + np.roll(f, -1, axis=0) + np.roll(f, 1, axis=0))/6 out += (np.roll(np.roll(f, -1, axis=0), -1, axis=1) + np.roll(np.roll(f, -1, axis=0), 1, axis=1) + np.roll(np.roll(f, 1, axis=0), -1, axis=1) + np.roll(np.roll(f, 1, axis=0), 1, axis=1))/12 out -= f return out