Source code for pyart.retrieve.kdp_proc

"""
Module for retrieving specific differential phase (KDP) from radar total
differential phase (PSIDP) measurements. Total differential phase is a function
of propagation differential phase (PHIDP), backscatter differential phase
(DELTAHV), and the system phase offset.

"""

import time
import warnings
from functools import partial

import numpy as np
from scipy import interpolate, linalg, optimize, signal, stats

from ..config import get_field_name, get_fillvalue, get_metadata
from ..util import rolling_window
from . import _kdp_proc

# Constants in the Kalman filter retrieval method (generally no need to
# modify them)
SCALERS = [
    0.1,
    10 ** (-0.8),
    10 ** (-0.6),
    10 ** (-0.4),
    10 ** (-0.2),
    1,
    10 ** (0.2),
    10 ** (0.4),
    10 ** (0.6),
    10 ** (0.8),
    1,
    10,
]

PADDING = 50  # Noise padding of the psidp signal (before and after signal)
SHIFT = 13  # Shifting of the final signal


[docs]def kdp_schneebeli( radar, gatefilter=None, fill_value=None, psidp_field=None, kdp_field=None, phidp_field=None, band="C", rcov=0, pcov=0, prefilter_psidp=False, filter_opt=None, parallel=True, ): """ Estimates Kdp with the Kalman filter method by Schneebeli and al. (2014) for a set of psidp measurements. Parameters ---------- radar : Radar Radar containing differential phase field. gatefilter : GateFilter, optional A GateFilter indicating radar gates that should be excluded when analysing differential phase measurements. fill_value : float, optional Value indicating missing or bad data in differential phase field, if not specified, the default in the Py-ART configuration file will be used. psidp_field : str, optional Total differential phase field. If None, the default field name must be specified in the Py-ART configuration file. kdp_field : str, optional Specific differential phase field. If None, the default field name must be specified in the Py-ART configuration file. phidp_field : str, optional Propagation differential phase field. If None, the default field name must be specified in the Py-ART configuration file. band : char, optional Radar frequency band string. Accepted "X", "C", "S" (capital or not). The band is used to compute intercepts -c and slope b of the delta = b*Kdp+c relation. rcov : 3x3 float array, optional Measurement error covariance matrix. pcov : 4x4 float array, optional Scaled state transition error covariance matrix. prefilter_psidp : bool, optional If set, the psidp measurements will first be filtered with the filter_psidp method, which can improve the quality of the final Kdp. filter_opt : dict, optional The arguments for the prefilter_psidp method, if empty, the defaults arguments of this method will be used. parallel : bool, optional Flag to enable parallel computation (one core for every psidp profile). Returns ------- kdp_dict : dict Retrieved specific differential phase data and metadata. kdp_std_dict : dict Estimated specific differential phase standard dev. data and metadata. phidpr_dict,: dict Retrieved differential phase data and metadata. References ---------- Schneebeli, M., Grazioli, J., and Berne, A.: Improved Estimation of the Specific Differential Phase SHIFT Using a Compilation of Kalman Filter Ensembles, IEEE T. Geosci. Remote Sens., 52, 5137-5149, doi:10.1109/TGRS.2013.2287017, 2014. """ # create parallel computing instance if parallel: import multiprocessing as mp pool = mp.Pool(processes=mp.cpu_count(), maxtasksperchild=1) # parse fill value if fill_value is None: fill_value = get_fillvalue() # parse field names if psidp_field is None: psidp_field = get_field_name("differential_phase") if kdp_field is None: kdp_field = get_field_name("specific_differential_phase") if phidp_field is None: phidp_field = get_field_name("differential_phase") # parse range resolution, length scale and low-pass filter constraint # weight dr = _parse_range_resolution(radar, check_uniform=True) # parse total differential phase measurements if prefilter_psidp: if filter_opt is None: filter_opt = {} # Assign psidp field to filter_psidp inputs filter_opt["psidp_field"] = psidp_field # Filter psidp psidp_o = filter_psidp(radar, **filter_opt) else: psidp_o = radar.fields[psidp_field]["data"] # mask radar gates indicated by the gate filter if gatefilter is not None: psidp_o = np.ma.masked_where(gatefilter.gate_excluded, psidp_o) func = partial(_kdp_kalman_profile, dr=dr, band=band, rcov=rcov, pcov=pcov) all_psidp_prof = list(psidp_o) if parallel: list_est = pool.map(func, all_psidp_prof) else: list_est = map(func, all_psidp_prof) kdp = np.zeros(psidp_o.shape) * np.nan kdp = np.ma.masked_array(kdp, fill_value=fill_value) kdp_stdev = np.zeros(psidp_o.shape) * np.nan kdp_stdev = np.ma.masked_array(kdp_stdev, fill_value=fill_value) phidp_rec = np.zeros(psidp_o.shape) * np.nan phidp_rec = np.ma.masked_array(phidp_rec, fill_value=fill_value) for i, size in enumerate(list_est): kdp[i, 0 : len(size[0])] = size[0] kdp_stdev[i, 0 : len(size[1])] = size[1] phidp_rec[i, 0 : len(size[2])] = size[2] # Mask the estimated Kdp and reconstructed Phidp with the mask of original # psidp if isinstance(psidp_o, np.ma.masked_array): masked = psidp_o.mask kdp = np.ma.array(kdp, mask=masked, fill_value=fill_value) kdp_stdev = np.ma.array(kdp_stdev, mask=masked, fill_value=fill_value) phidp_rec = np.ma.array(phidp_rec, mask=masked, fill_value=fill_value) # create specific differential phase field dictionary and store data kdp_dict = get_metadata(kdp_field) kdp_dict["data"] = kdp # kdp_dict['valid_min'] = -1.0 # create reconstructed differential phase field dictionary and store data phidpr_dict = get_metadata(phidp_field) phidpr_dict["data"] = phidp_rec # phidpr_dict['valid_min'] = 0.0 # create specific phase stdev field dictionary and store data kdp_stdev_dict = {} kdp_stdev_dict["units"] = "degrees/km" kdp_stdev_dict["standard_name"] = "estimated KDP stdev" kdp_stdev_dict["long_name"] = "Estimated stdev of spec. diff. phase (KDP)" kdp_stdev_dict["coordinates"] = "elevation azimuth range" kdp_stdev_dict["data"] = kdp_stdev kdp_stdev_dict["valid_min"] = 0.0 if parallel: pool.close() return kdp_dict, kdp_stdev_dict, phidpr_dict
def _kdp_estimation_backward_fixed( psidp_in, rcov, pcov_scale, f, f_transposed, h_plus, c1, c2, b1, b2, kdp_th, mpsidp ): """ Processing one profile of Psidp and estimating Kdp and Phidp with the KFE algorithm described in Schneebeli et al, 2014 IEEE_TGRS. This routine estimates Kdp in the backward direction given a set of matrices that define the Kalman filter. Parameters ---------- psidp_in : ndarray One-dimensional vector of length -nrg- containining the input psidp [degrees]. rcov : 3x3 float array Measurement error covariance matrix. pcov_scale : 4x4 float array Scaled state transition error covariance matrix. f : 4x4 float array Forward state prediction matrix [4x4]. f_transposed : 4x4 float array Transpose of F. h_plus : 4x3 float array Measurement prediction matrix [4x3]. c1, c2, b1, b2 : floats The values of the intercept of the relation c = b*Kdp - delta. This relation uses b1, c1 IF kdp is lower than a kdp_th and b2, c2 otherwise kdp_th. kdp_th : float The kdp threshold which separates the two Kdp - delta regime i.e. the power law relating delta to Kdp will be different if Kdp is larger or smaller than kdp_th. mpsidp : float Final observed value of psidp along the radial (usually also the max value), needed for inverting the psidp vector. Returns ------- kdp : ndarray Filtered Kdp [degrees/km]. Same length as Psidp. error_kdp : ndarray Estimated error on Kdp values. """ # Define the input psidp = psidp_in # invert Psidp (backward estimation) psidp = mpsidp - psidp[::-1] nrg_new = len(psidp) # Initialize the state vector to 0 s = np.zeros([4, 1]) # first state estimate # define measurement vector z = np.zeros([3, 1]) # Define the identity matrix identity_i = np.eye(4) p = identity_i * 4.0 kdp = np.zeros([nrg_new]) kdp_error = np.zeros([nrg_new]) # Loop on all the gates and apply the filter for ii in range(0, nrg_new - 1): z[0] = psidp[ii] z[1] = psidp[ii + 1] s_pred = np.dot(f, s) # state prediciton p_pred = np.dot(f, np.dot(p, f_transposed)) + pcov_scale # error prediction if s_pred[0] > kdp_th: h_plus[2, 0] = b2 z[2] = c2 else: h_plus[2, 0] = b1 z[2] = c1 # as far as i see aludc is symmetrical, so i do not transpose it aludc = np.dot(h_plus, (np.dot(p_pred, h_plus.T))) + rcov # below we get the transposed of B_mat directly b_mat = np.dot(h_plus, p_pred) # Cholesky decomposition cho = linalg.cho_factor(aludc) k = linalg.cho_solve(cho, b_mat, check_finite=False, overwrite_b=True).T # Update state and error s = np.dot(k, (np.dot(-h_plus, s_pred) + z)) + s_pred p = np.dot((identity_i - np.dot(k, h_plus)), p_pred) # Fill the output kdp[ii] = s[0] kdp_error[ii] = p[0, 0] # Shift n = len(kdp) dummy = np.copy(kdp) kdp[np.arange(SHIFT) + len(kdp) - SHIFT] = 0 kdp[np.arange(n - 1 - SHIFT)] = dummy[np.arange(n - 1 - SHIFT) + SHIFT] # Reverse the estimates (backward direction) kdp = kdp[::-1] kdp_error = kdp_error[::-1] return kdp, kdp_error def _kdp_estimation_forward_fixed( psidp_in, rcov, pcov_scale, f, f_transposed, h_plus, c1, c2, b1, b2, kdp_th ): """ Processing one profile of Psidp and estimating Kdp and Phidp with the KFE algorithm described in Schneebeli et al, 2014 IEEE_TGRS. This routine estimates Kdp in the forward direction given a set of matrices that define the Kalman filter. Parameters ---------- psidp_in : ndarray One-dimensional vector of length -nrg- containining the input psidp [degrees]. rcov : 3x3 float array. Measurement error covariance matrix. pcov_scale : 4x4 float array Scaled state transition error covariance matrix. f : 4x4 float array Forward state prediction matrix [4x4]. f_transposed : 4x4 float array Transpose of F. h_plus : 4x3 float array*np.nan Measurement prediction matrix [4x3]. c1, c2, b1, b2 : floats The values of the intercept of the relation c = b*Kdp - delta. This relation uses b1, c1 IF kdp is lower than a kdp_th and b2, c2 otherwise kdp_th. Returns ------- kdp : ndarray Filtered Kdp [degrees/km]. Same length as Psidp. phidp : ndarray Estimated phidp (smooth psidp). error_kdp : ndarray Estimated error on Kdp values. """ # Define the input psidp = np.ma.filled(psidp_in) nrg_new = len(psidp) # Initialize the state vector to 0 s = np.zeros([4, 1]) # first state estimate # define measurement vector z = np.zeros([3, 1]) # Define the identity matrix identity_i = np.eye(4) p = identity_i * 4.0 phidp = np.zeros([nrg_new]) kdp = np.zeros([nrg_new]) kdp_error = np.zeros([nrg_new]) # Loop on all the gates and apply the filter for ii in range(0, nrg_new - 1): z[0] = psidp[ii] z[1] = psidp[ii + 1] s_pred = np.dot(f, s) # state prediciton p_pred = np.dot(f, np.dot(p, f_transposed)) + pcov_scale # error prediction if s_pred[0] > kdp_th: h_plus[2, 0] = b2 z[2] = c2 else: h_plus[2, 0] = b1 z[2] = c1 # as far as i see aludc is symmetrical, so i do not transpose it aludc = np.dot(h_plus, (np.dot(p_pred, h_plus.T))) + rcov # below we get the transposed of B_mat directly b_mat = np.dot(h_plus, p_pred) # Cholesky decomposition cho = linalg.cho_factor(aludc) k = linalg.cho_solve(cho, b_mat, check_finite=False, overwrite_b=True).T # Update state and error s = np.dot(k, (np.dot(-h_plus, s_pred) + z)) + s_pred p = np.dot((identity_i - np.dot(k, h_plus)), p_pred) # Fill the output kdp[ii] = s[0] kdp_error[ii] = p[0, 0] phidp[ii] = s[2] # Shift dummy = np.copy(kdp) kdp[np.arange(SHIFT) + len(kdp) - SHIFT] = 0 kdp[np.arange(len(kdp) - SHIFT)] = dummy[np.arange(len(kdp) - SHIFT) + SHIFT] return kdp, phidp, kdp_error def _kdp_kalman_profile(psidp_in, dr, band="X", rcov=0, pcov=0): """ Estimates Kdp with the Kalman filter method by Schneebeli and al. (2014) for a set of psidp measurements. Parameters ---------- psidp_in : ndarray One-dimensional vector of length -nrg- containining the input psidp [degrees]. dr : float Range resolution in meters. band : char, optional Radar frequency band string. Accepted "X", "C", "S" (capital or not). The band is used to compute intercepts -c and slope b of the delta = b*Kdp+c relation. rcov : 3x3 float array, optional Measurement error covariance matrix. pcov : 4x4 float array, optional Scaled state transition error covariance matrix. Returns ------- kdp_dict : ndarray Retrieved specific differential phase data. kdp_std_dict : ndarray Estimated specific differential phase standard dev. data. phidpr_dict : ndarray Retrieved differential phase data. References ---------- Schneebeli, M., Grazioli, J., and Berne, A.: Improved Estimation of the Specific Differential Phase Shift Using a Compilation of Kalman Filter Ensembles, IEEE T. Geosci. Remote Sens., 52, 5137-5149, doi:10.1109/TGRS.2013.2287017, 2014. """ dr = dr / 1000.0 # Convert rad. res. to km # NOTE! Parameters are not checked to save as much time as possible # Replace missing values with nans psidp_in = np.ma.filled(psidp_in, np.nan) # Check if psidp has at least one finite value if not np.isfinite(psidp_in).any(): return psidp_in, psidp_in, psidp_in # Return the NaNs... # Set default of the error covariance matrices if not isinstance(pcov, np.ndarray): pcov = np.array( [ [ (0.11 + 1.56 * dr) ** 2, (0.11 + 1.85 * dr) ** 2, 0, (0.01 + 1.1 * dr) ** 2, ], [ (0.11 + 1.85 * dr) ** 2, (0.18 + 3.03 * dr) ** 2, 0, (0.01 + 1.23 * dr) ** 2, ], [0, 0, 0, 0], [ (0.01 + 1.1 * dr) ** 2, (0.01 + 1.23 * dr) ** 2, 0, (-0.04 + 1.27 * dr) ** 2, ], ] ) if not isinstance(rcov, np.ndarray): rcov = np.array( [ [4.10625, -0.0498779, -0.0634192], [-0.0498779, 4.02369, -0.0421455], [-0.0634192, -0.0421455, 1.44300], ] ) # Define default parameters # Intercepts -c and slope b of delta=b*Kdp+c # According to the Kdp threshold selected if band == "X": c1 = -0.054 c2 = -6.155 b1 = 2.3688 b2 = 0.2734 kdp_th = 2.5 elif band == "C": c1 = -0.036 c2 = -1.03 b1 = 0.53 b2 = 0.15 kdp_th = 2.5 elif band == "S": c1 = -0.024 c2 = -0.15 b1 = 0.19 b2 = 0.019 kdp_th = 1.1 # Parameters for the final selection from the KF ensemble members fac1 = 1.2 fac2 = 3.0 th1_comp = -0.15 th2_comp = 0.15 th1_final = -0.25 # Kalman matrices # State matrix f = np.array( [[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 0, 1], [2 * dr, 0, 0, 1]], dtype=float ) f_transposed = f.T # Measurement prediction matrix-------------------------------- # J. Grazioli modification 07.2015 --previous H_plus buggy-- h_plus = np.array( [[-2 * dr, 1, 0, 1], [2 * dr, 1, 1, 0], [0, -1, 0, 0]], dtype=float ) # Define the input psidp = psidp_in # Get indices of finite data real_data_ind = np.where(np.isfinite(psidp.ravel()))[0] offset = real_data_ind[0] if len(real_data_ind): mpsidp = psidp.ravel()[real_data_ind[-1]] else: mpsidp = np.nan psidp = psidp[offset : real_data_ind[-1] + 1] nrg = len(psidp) # Define the output kdp_filter_out = np.zeros( [ nrg, ] ) kdp_mat = np.zeros([nrg, 2 * len(SCALERS)]) kdp_sim = np.zeros([nrg, len(SCALERS)]) phidp_filter_out = np.zeros([nrg]) # Prepare a longer array with some extra gates on each side # add values at the beginning and at the end of the profile psidp_long = ( np.zeros( [ nrg + PADDING * 2, ] ) * np.nan ) nn = nrg + PADDING * 2 noise = 2 * np.random.randn(PADDING) psidp_long[0:PADDING] = noise + psidp[0] psidp_long[nrg + PADDING : nrg + 2 * PADDING] = mpsidp + noise psidp_long[PADDING : nrg + PADDING] = psidp psidp = psidp_long # Get information of valid and non valid points in psidp the new psidp nonan = np.where(np.isfinite(psidp))[0] nan = np.where(np.isnan(psidp))[0] ranged = np.arange(0, nn) psidp_interp = psidp # interpolate if len(nan): interp = interpolate.interp1d(ranged[nonan], psidp[nonan], kind="zero") psidp_interp[nan] = interp(ranged[nan]) else: psidp_interp = psidp # add noise if len(nan): psidp_interp[nan] = psidp_interp[nan] + 2 * np.random.randn(len(nan)) # Define the final input and output psidp = psidp_interp # Smallest scaler scaler = 10 ** (-2.0) # Backward kdp_dummy_b2, _ = _kdp_estimation_backward_fixed( psidp, rcov, pcov * scaler, f, f_transposed, h_plus, c1, c2, b1, b2, kdp_th, mpsidp, ) kdp002 = kdp_dummy_b2[PADDING : nrg + PADDING] # Forward kdp_dummy_b2, _, _ = _kdp_estimation_forward_fixed( psidp, rcov, pcov * scaler, f, f_transposed, h_plus, c1, c2, b1, b2, kdp_th ) kdp002f = kdp_dummy_b2[PADDING : nrg + PADDING] # Generate the ensemble of Kalman filters estimates in backward and # Forward directions for i, sc in enumerate(SCALERS): # Loop on scalers # Forward kdp_dummy_f2, _, _ = _kdp_estimation_forward_fixed( psidp, rcov, pcov * sc, f, f_transposed, h_plus, c1, c2, b1, b2, kdp_th ) kdp_mat[:, 2 * i] = kdp_dummy_f2[PADDING : nrg + PADDING] # Backward kdp_dummy_b2, _ = _kdp_estimation_backward_fixed( psidp, rcov, pcov * sc, f, f_transposed, h_plus, c1, c2, b1, b2, kdp_th, mpsidp, ) kdp_mat[:, 2 * i + 1] = kdp_dummy_b2[PADDING : nrg + PADDING] # Compile the final estimate # Get some reference mean values kdp_mean = np.nanmean(kdp_mat, axis=1) kdp_mean_shift = np.roll(kdp_mean, -1) diff_mean = kdp_mean - kdp_mean_shift kdp_std = np.nanstd(kdp_mat, axis=1) if len(diff_mean) < 4: size_filt = len(diff_mean) else: size_filt = 4 diff_mean_smooth = np.convolve( diff_mean, np.ones((size_filt,)) / size_filt, mode="same" ) # Backward estimate if diff_mean greater than a defined threshold condi = np.where(diff_mean_smooth > th2_comp)[0] if len(condi): kdp_dummy = kdp_mat[:, np.arange(len(SCALERS)) * 2 + 1] kdp_sim[condi, :] = kdp_dummy[condi, :] # Forward estimate if diff_mean lower than a defined threshold condi = np.where(diff_mean_smooth < th1_comp)[0] if len(condi): kdp_dummy = kdp_mat[:, np.arange(len(SCALERS)) * 2] kdp_sim[condi, :] = kdp_dummy[condi, :] # Combination of the two in the middle condi = np.where( np.logical_and(diff_mean_smooth >= th1_comp, diff_mean_smooth <= th2_comp) )[0] if len(condi): weight2 = (-0.5 / 0.15) * diff_mean_smooth + 0.5 weight2 = np.tile(weight2, (len(SCALERS), 1)).T kdp_dummy = (1 - weight2) * kdp_mat[ :, np.arange(len(SCALERS)) * 2 + 1 ] + weight2 * kdp_mat[:, np.arange(len(SCALERS)) * 2] kdp_sim[condi, :] = kdp_dummy[condi, :] # Now we reduced to 11 ensemble members: compile the final one kdp_mean_sim = np.nanmean(kdp_sim, axis=1) kdp_std_sim = np.nanstd(kdp_sim, axis=1) kdp_low_mean2 = np.nanmean(np.vstack((kdp002, kdp002f)).T, axis=1) # Get the range of ensemble members that compile # a final estimate # Lower bounds lower_bound = np.round(kdp_mean_sim * fac1) - np.round(kdp_std_sim * fac2) lower_bound = np.maximum(lower_bound, 0) lower_bound = np.minimum(lower_bound, len(SCALERS) - 1) # Upper bounds upper_bound = np.round(kdp_mean_sim * fac1) + np.round(kdp_std_sim * fac2) upper_bound = np.maximum(upper_bound, 0) upper_bound = np.minimum(upper_bound, len(SCALERS) - 1) # Final selection of the ensemble members for uu in range(0, nrg - 1): selection_vector = ( np.arange(upper_bound[uu] - lower_bound[uu] + 1) + lower_bound[uu] ) selection_vector = selection_vector.astype(int) kdp_filter_out[uu] = np.mean(kdp_sim[uu, selection_vector]) # Final filtering of excessively negative values: # TO DO: It would be better to get rid of this filtering ind_lt_0 = np.where(kdp_filter_out < th1_final)[0] if len(ind_lt_0): kdp_filter_out[ind_lt_0] = kdp_low_mean2[ind_lt_0] # Compute phidp from Kdp phidp_filter_out = np.cumsum(kdp_filter_out) * 2.0 * dr phinan = np.where(np.isnan(psidp))[0] if len(phinan): phidp_filter_out[phinan] = np.nan kdp_filter_out[phinan] = np.nan phidp_filter_out[nrg - 1] = np.nan kdp_filter_out[nrg - 1] = np.nan # Pad with nan if offset > 0 kdp_filter_out = np.pad( kdp_filter_out, (offset, 0), mode="constant", constant_values=np.nan ) kdp_std = np.pad(kdp_std, (offset, 0), mode="constant", constant_values=np.nan) phidp_filter_out = np.pad( phidp_filter_out, (offset, 0), mode="constant", constant_values=np.nan ) return kdp_filter_out, kdp_std, phidp_filter_out
[docs]def kdp_vulpiani( radar, gatefilter=None, fill_value=None, psidp_field=None, kdp_field=None, phidp_field=None, band="C", windsize=10, n_iter=10, interp=False, prefilter_psidp=False, filter_opt=None, parallel=False, ): """ Estimates Kdp with the Vulpiani method for a 2D array of psidp measurements with the first dimension being the distance from radar and the second dimension being the angles (azimuths for PPI, elev for RHI).The input psidp is assumed to be pre-filtered (for ex. with the filter_psidp function) Parameters ---------- radar : Radar Radar containing differential phase field. gatefilter : GateFilter, optional A GateFilter indicating radar gates that should be excluded when analysing differential phase measurements. fill_value : float, optional Value indicating missing or bad data in differential phase field, if not specified, the default in the Py-ART configuration file will be used psidp_field : str, optional Total differential phase field. If None, the default field name must be specified in the Py-ART configuration file. kdp_field : str, optional Specific differential phase field. If None, the default field name must be specified in the Py-ART configuration file. phidp_field : str, optional Propagation differential phase field. If None, the default field name must be specified in the Py-ART configuration file. band : char, optional Radar frequency band string. Accepted "X", "C", "S" (capital or not). It is used to set default boundaries for expected values of Kdp. windsize : int, optional Size in # of gates of the range derivative window. Should be even. n_iter : int, optional Number of iterations of the method. Default is 10. interp : bool, optional If True, all the nans are interpolated.The advantage is that less data are lost (the iterations in fact are "eating the edges") but some non-linear errors may be introduced. prefilter_psidp : bool, optional If set, the psidp measurements will first be filtered with the filter_psidp method, which can improve the quality of the final Kdp. filter_opt : dict, optional The arguments for the prefilter_psidp method, if empty, the defaults arguments of this method will be used. parallel : bool, optional Flag to enable parallel computation (one core for every psidp profile). Returns ------- kdp_dict : dict Retrieved specific differential phase data and metadata. phidpr_dict,: dict Retrieved differential phase data and metadata. References ---------- Gianfranco Vulpiani, Mario Montopoli, Luca Delli Passeri, Antonio G. Gioia, Pietro Giordano, and Frank S. Marzano, 2012: On the Use of Dual-Polarized C-Band Radar for Operational Rainfall Retrieval in Mountainous Areas. J. Appl. Meteor. Climatol., 51, 405-425, doi: 10.1175/JAMC-D-10-05024.1. """ if np.mod(windsize, 2): warnings.warn( "In the Vulpiani method, the windsize should be even. " + "Using default value, windsize = 10" ) windsize = 10 if parallel: import multiprocessing as mp pool = mp.Pool(processes=mp.cpu_count(), maxtasksperchild=1) # parse fill value if fill_value is None: fill_value = get_fillvalue() # parse field names if psidp_field is None: psidp_field = get_field_name("differential_phase") if kdp_field is None: kdp_field = get_field_name("specific_differential_phase") if phidp_field is None: phidp_field = get_field_name("differential_phase") # parse range resolution, length scale and low-pass filter constraint # weight dr = _parse_range_resolution(radar, check_uniform=True) # parse total differential phase measurements if prefilter_psidp: if filter_opt is None: filter_opt = {} # Assign psidp field to filter_psidp inputs filter_opt["psidp_field"] = psidp_field # Filter psidp psidp_o = filter_psidp(radar, **filter_opt) else: psidp_o = radar.fields[psidp_field]["data"] # mask radar gates indicated by the gate filter if gatefilter is not None: psidp_o = np.ma.masked_where(gatefilter.gate_excluded, psidp_o) func = partial( _kdp_vulpiani_profile, dr=dr, windsize=windsize, band=band, n_iter=n_iter, interp=interp, ) all_psidp_prof = list(psidp_o) if parallel: list_est = pool.map(func, all_psidp_prof) else: list_est = map(func, all_psidp_prof) kdp = np.ma.zeros(psidp_o.shape) kdp[:] = np.ma.masked kdp.set_fill_value(fill_value) phidp_rec = np.ma.zeros(psidp_o.shape) phidp_rec[:] = np.ma.masked phidp_rec.set_fill_value(fill_value) for i, size in enumerate(list_est): kdp[i, 0 : len(size[0])] = size[0] phidp_rec[i, 0 : len(size[1])] = size[1] # Mask the estimated Kdp and reconstructed Phidp with the mask of original # psidp if isinstance(psidp_o, np.ma.masked_array): masked = psidp_o.mask kdp = np.ma.array(kdp, mask=masked, fill_value=fill_value) phidp_rec = np.ma.array(phidp_rec, mask=masked, fill_value=fill_value) # create specific differential phase field dictionary and store data kdp_dict = get_metadata(kdp_field) kdp_dict["data"] = kdp # kdp_dict['valid_min'] = 0.0 # create reconstructed differential phase field dictionary and store data phidpr_dict = get_metadata(phidp_field) phidpr_dict["data"] = phidp_rec # phidpr_dict['valid_min'] = 0.0 if parallel: pool.close() return kdp_dict, phidpr_dict
def _kdp_vulpiani_profile(psidp_in, dr, windsize=10, band="X", n_iter=10, interp=False): """ Estimates Kdp with the Vulpiani method for a single profile of psidp measurements Parameters ---------- psidp_in : ndarray Total differential phase measurements. dr : float Range resolution in meters. windsize : int, optional Size in # of gates of the range derivative window. band : char, optional Radar frequency band string. Accepted "X", "C", "S" (capital or not). It is used to set default boundaries for expected values of Kdp n_iter : int, optional Number of iterations of the method. Default is 10. interp : bool, optional If set all the nans are interpolated.The advantage is that less data are lost (the iterations in fact are "eating the edges") but some non-linear errors may be introduced Returns ------- kdp_calc : ndarray Retrieved specific differential profile phidp_rec,: ndarray Retrieved differential phase profile """ mask = np.ma.getmaskarray(psidp_in) size = windsize l2 = int(size / 2) drm = dr / 1000.0 if mask.all() is True: # Check if all elements are masked return psidp_in, psidp_in, psidp_in # Return the NaNs... # Thresholds in kdp calculation if band == "X": th1 = -2.0 th2 = 40.0 std_th = 5.0 elif band == "C": th1 = -2.0 th2 = 20.0 std_th = 5.0 elif band == "S": th1 = -2.0 th2 = 14.0 std_th = 5.0 else: print("Unexpected value set for the band keyword ") print(band) return None psidp = psidp_in nn = len(psidp_in) # Get information of valid and non valid points in psidp the new psidp valid = np.logical_not(mask) if interp: ranged = np.arange(0, nn) psidp_interp = psidp # interpolate if np.ma.is_masked(psidp): interp = interpolate.interp1d( ranged[valid], psidp[valid], kind="zero", bounds_error=False, fill_value=np.nan, ) psidp_interp[mask] = interp(ranged[mask]) psidp = psidp_interp psidp = np.ma.filled(psidp, np.nan) kdp_calc = np.zeros([nn]) # first guess # In the core of the profile kdp_calc[l2 : nn - l2] = (psidp[size:nn] - psidp[0 : nn - size]) / ( 2.0 * size * drm ) # set ray extremes to 0 kdp_calc[0:l2] = 0.0 kdp_calc[nn - l2 :] = 0.0 # apply thresholds kdp_calc[kdp_calc <= th1] = 0.0 kdp_calc[kdp_calc >= th2] = 0.0 # set all non-valid data to 0 kdp_calc[np.isnan(kdp_calc)] = 0.0 # Remove bins with texture higher than treshold tex = np.ma.zeros(kdp_calc.shape) # compute the local standard deviation # (make sure that it is and odd window) tex_aux = np.ma.std(rolling_window(kdp_calc, l2 * 2 + 1), -1) tex[l2:-l2] = tex_aux kdp_calc[tex > std_th] = 0.0 # Loop over iterations for i in range(0, n_iter): phidp_rec = np.ma.cumsum(kdp_calc) * 2.0 * drm # In the core of the profile kdp_calc[l2 : nn - l2] = (phidp_rec[size:nn] - phidp_rec[0 : nn - size]) / ( 2.0 * size * drm ) # set ray extremes to 0 kdp_calc[0:l2] = 0.0 kdp_calc[nn - l2 :] = 0.0 # apply thresholds kdp_calc[kdp_calc <= th1] = 0.0 kdp_calc[kdp_calc >= th2] = 0.0 # Censor Kdp where Psidp was not defined kdp_calc = np.ma.masked_where(mask, kdp_calc) # final reconstructed PhiDP from KDP phidp_rec = np.ma.cumsum(kdp_calc) * 2.0 * drm return kdp_calc, phidp_rec def filter_psidp( radar, psidp_field=None, rhohv_field=None, minsize_seq=5, median_filter_size=7, thresh_rhohv=0.65, max_discont=90, ): """ Filter measured psidp to remove spurious data in four steps: 1. Censor it where Rhohv is lower than threshold 2. Unravel angles when strong discontinuities are detected 3. Remove very short sequences of valid data 4. Apply a median filter on every profile Parameters ---------- radar : Radar Radar containing differential phase field. psidp_field : str, optional Total differential phase field. If None, the default field name must be specified in the Py-ART configuration file. rhohv_field : str, optional Cross correlation ratio field. If None, the default field name must be specified in the Py-ART configuration file. minsize_seq : integer, optional Minimal len (in radar gates) of sequences of valid data to be accepted median_filter_size : integer, optional Size (in radar gates) of the median filter to be applied on psidp thresh_rhohv : float, optional Censoring threshold in rhohv (gates with rhohv < thresh_rhohv) will be rejected max_discont : int, optional Maximum discontinuity between psidp values, default is 90 deg Returns ------- psidp_filt : ndarray Filtered psidp field """ # parse field names if psidp_field is None: psidp_field = get_field_name("differential_phase") if rhohv_field is None: rhohv_field = get_field_name("cross_correlation_ratio") # parse total differential phase and cross corr. measurements psidp_o = radar.fields[psidp_field]["data"] rhohv = radar.fields[rhohv_field]["data"] if np.isscalar(psidp_o.mask): # Ensure 2D mask psidp_o.mask = np.zeros(psidp_o.shape) + psidp_o.mask if not isinstance(psidp_o, np.ma.masked_array): psidp_o = np.ma.array(psidp_o, mask=np.isnan(psidp_o)) # Initialize mask mask = np.ones(psidp_o.shape) * False # Condition on rhohv mask += rhohv < thresh_rhohv # Get original mask mask += psidp_o.mask # Remove short sequences and unwrap psidp_filt = np.zeros(psidp_o.shape) for i, psi_row in enumerate(psidp_o): idx = np.where(~psi_row.mask)[0] # idx of last valid gate if len(idx): psi_row = psi_row[0 : idx[-1] + 1] psi_row[~psi_row.mask] = np.rad2deg( np.unwrap(np.deg2rad(psi_row[~psi_row.mask]), np.deg2rad(max_discont)) ) psi_row_with_nan = np.ma.filled(psi_row, np.nan) # To be sure to always have a left and right neighbour, # we need to pad the signal with NaN psi_row_with_nan = np.pad( psi_row_with_nan, (1, 1), "constant", constant_values=(np.nan,) ) idx = np.where(np.isfinite(psi_row_with_nan))[0] nan_left = idx[np.where(np.isnan(psi_row_with_nan[idx - 1]))[0]] nan_right = idx[np.where(np.isnan(psi_row_with_nan[idx + 1]))[0]] len_sub = nan_right - nan_left for j, size in enumerate(len_sub): if size < minsize_seq: mask[i, nan_left[j] - 1 : nan_right[j] + 1] = True # median filter psi_row = signal.medfilt(psi_row_with_nan, median_filter_size) psidp_filt[i, 0 : len(psi_row[1:-1])] = psi_row[1:-1] psidp_filt = np.ma.masked_array( psidp_filt, mask=mask, fill_value=psidp_o.fill_value ) return psidp_filt # Necessary and/or potential future improvements to the KDP module: # # * The near and far range gate boundary conditions necessary for the Maesaka # et al. (2012) variational method are sensitive to the number of range gates # used to define both boundaries as well as outliers in these samples. This # may have to be further investigated and potentially improved upon. # # * It would be beneficial for the GateFilter class to have a method which can # flag radar gates above the melting layer, maybe through the user supplying # an estimate of the 0 deg Celsius level above mean sea level. # # * The backscatter differential phase parameter will likely have to be updated # in the future to handle various parameterizations. # # * The addition of an azimuthal low-pass filter (smoothness) constraint would # be beneficial to the variational Maesaka et al. (2012) algorithm. # # * Add the ability to record and/or store minimization data, e.g., current # iteration number, functional value and functional gradient norm value as a # function of iteration.
[docs]def kdp_maesaka( radar, gatefilter=None, method="cg", backscatter=None, Clpf=1.0, length_scale=None, first_guess=0.01, finite_order="low", fill_value=None, proc=1, psidp_field=None, kdp_field=None, phidp_field=None, debug=False, verbose=False, **kwargs, ): """ Compute the specific differential phase (KDP) from corrected (e.g., unfolded) total differential phase data based on the variational method outlined in Maesaka et al. (2012). This method assumes a monotonically increasing propagation differential phase (PHIDP) with increasing range from the radar, and therefore is limited to rainfall below the melting layer and/or warm clouds at weather radar frequencies (e.g., S-, C-, and X-band). This method currently only supports radar data with constant range resolution. Following the notation of Maesaka et al. (2012), the primary control variable k is proportional to KDP, k**2 = 2 * KDP * dr which, because of the square, assumes that KDP always takes a positive value. Parameters ---------- radar : Radar Radar containing differential phase field. gatefilter : GateFilter A GateFilter indicating radar gates that should be excluded when analysing differential phase measurements. method : str, optional Type of scipy.optimize method to use when minimizing the cost functional. The default method uses a nonlinear conjugate gradient algorithm. In Maesaka et al. (2012) they use the Broyden-Fletcher- Goldfarb-Shanno (BFGS) algorithm, however for large functional size (e.g., 100K+ variables) this algorithm is considerably slower than a conjugate gradient algorithm. backscatter : optional Define the backscatter differential phase. If None, the backscatter differential phase is set to zero for all range gates. Note that backscatter differential phase can be parameterized using attentuation corrected differential reflectivity. Clpf : float, optional The low-pass filter (radial smoothness) constraint weight as in equation (15) of Maesaka et al. (2012). length_scale : float, optional Length scale in meters used to bring the dimension and magnitude of the low-pass filter cost functional in line with the observation cost functional. If None, the length scale is set to the range resolution. first_guess : float, optional First guess for control variable k. Since k is proportional to the square root of KDP, the first guess should be close to zero to signify a KDP field close to 0 deg/km everywhere. However, the first guess should not be exactly zero in order to avoid convergence criteria after the first iteration. In fact it is recommended to use a value closer to one than zero. finite_order : 'low' or 'high', optional The finite difference accuracy to use when computing derivatives. maxiter : int, optional Maximum number of iterations to perform during cost functional minimization. The maximum number of iterations are only performed if convergence criteria are not met. For variational schemes such as this one, it is generally not recommended to try and achieve convergence criteria since the values of the cost functional and/or its gradient norm are somewhat arbitrary. fill_value : float, optional Value indicating missing or bad data in differential phase field. proc : int, optional The number of parallel threads (CPUs) to use. Currently no multiprocessing capability exists. psidp_field : str, optional Total differential phase field. If None, the default field name must be specified in the Py-ART configuration file. kdp_field : str, optional Specific differential phase field. If None, the default field name must be specified in the Py-ART configuration file. phidp_field : str, optional Propagation differential phase field. If None, the default field name must be specified in the Py-ART configuration file. debug : bool, optional True to print debugging information, False to suppress. verbose : bool, optional True to print relevant information, False to suppress. Returns ------- kdp_dict : dict Retrieved specific differential phase data and metadata. phidpf_dict, phidpr_dict : dict Retrieved forward and reverse direction propagation differential phase data and metadata. References ---------- Maesaka, T., Iwanami, K. and Maki, M., 2012: "Non-negative KDP Estimation by Monotone Increasing PHIDP Assumption below Melting Layer". The Seventh European Conference on Radar in Meteorology and Hydrology. """ # parse fill value if fill_value is None: fill_value = get_fillvalue() # parse field names if psidp_field is None: psidp_field = get_field_name("differential_phase") if kdp_field is None: kdp_field = get_field_name("specific_differential_phase") if phidp_field is None: phidp_field = get_field_name("differential_phase") # parse range resolution, length scale and low-pass filter constraint # weight dr = _parse_range_resolution(radar, check_uniform=True, verbose=verbose) # parse length scale if length_scale is None: length_scale = dr # parse low-pass filter constraint weight # this brings the dimensions and magnitude of the low-pass filter # constraint in line with the differential phase measurement constraint Clpf *= length_scale**4 # parse near and far range gate propagation differential phase boundary # conditions bcs = boundary_conditions_maesaka( radar, gatefilter=gatefilter, psidp_field=psidp_field, debug=debug, verbose=verbose, **kwargs, ) phi_near, phi_far = bcs[:2] idx_near, idx_far = bcs[4:] # parse total differential phase measurements psidp_o = radar.fields[psidp_field]["data"] if debug: N = np.ma.count(psidp_o) print(f"Sample size before filtering: {N}") # mask radar gates indicated by the gate filter if gatefilter is not None: psidp_o = np.ma.masked_where(gatefilter.gate_excluded, psidp_o) # mask any radar gates which are closer (further) than the near (far) # boundary condition ranges for ray in range(radar.nrays): psidp_o[ray, : idx_near[ray]] = np.ma.masked psidp_o[ray, idx_far[ray] + 1 :] = np.ma.masked if debug: N = np.ma.count(psidp_o) print(f"Sample size after filtering: {N}") # parse differential phase measurement weight Cobs = np.logical_not(np.ma.getmaskarray(psidp_o)).astype(psidp_o.dtype) psidp_o = np.ma.filled(psidp_o, fill_value) # parse backscattered differential phase data if backscatter is None: dhv = np.zeros_like(psidp_o, subok=False) # parse solver options options = { "maxiter": kwargs.get("maxiter", 50), "gtol": kwargs.get("gtol", 1.0e-5), "disp": verbose, } if debug: optimize.show_options(solver="minimize", method=method) # parse initial conditions (first guess) x0 = np.zeros_like(psidp_o, subok=False).flatten() x0.fill(first_guess) if verbose: print(f"Cost functional size: {x0.size}") # define arguments for cost functional and its Jacobian (gradient) args = ( psidp_o, [phi_near, phi_far], dhv, dr, Cobs, Clpf, finite_order, fill_value, proc, debug, verbose, ) if debug: start = time.time() # minimize the cost functional xopt = optimize.minimize( _cost_maesaka, x0, args=args, method=method, jac=_jac_maesaka, hess=None, hessp=None, bounds=None, constraints=None, callback=None, options=options, ) if debug: elapsed = time.time() - start print(f"Elapsed time for minimization: {elapsed:.0f} sec") # parse control variables from optimized result k = xopt.x.reshape(psidp_o.shape) # compute specific differential phase from control variable k in deg/km kdp = k**2 / (2.0 * dr) * 1000.0 if debug: print(f"Min retrieved KDP: {kdp.min():.2f} deg/km") print(f"Max retrieved KDP: {kdp.max():.2f} deg/km") print(f"Mean retrieved KDP: {kdp.mean():.2f} deg/km") # create specific differential phase field dictionary and store data kdp_dict = get_metadata(kdp_field) kdp_dict["data"] = kdp kdp_dict["valid_min"] = 0.0 kdp_dict["Clpf"] = Clpf # compute forward and reverse direction propagation differential phase phidp_f, phidp_r = _forward_reverse_phidp(k, [phi_near, phi_far], verbose=verbose) # create forward direction propagation differential phase field dictionary # and store data phidpf_dict = get_metadata(phidp_field) phidpf_dict["data"] = phidp_f phidpf_dict["comment"] = "Retrieved in forward direction" # create reverse direction propagation differential phase field dictionary # and store data phidpr_dict = get_metadata(phidp_field) phidpr_dict["data"] = phidp_r phidpr_dict["comment"] = "Retrieved in reverse direction" return kdp_dict, phidpf_dict, phidpr_dict
def boundary_conditions_maesaka( radar, gatefilter=None, n=20, psidp_field=None, debug=False, verbose=False, **kwargs ): """ Determine near range gate and far range gate propagation differential phase boundary conditions. This follows the method outlined in Maesaka et al. (2012), except instead of using the mean we use the median which is less susceptible to outliers. This function can also be used to estimate the system phase offset. Parameters ---------- radar : Radar Radar containing total differential phase measurements. gatefilter : GateFilter A GateFilter indicating radar gates that should be excluded when analysing differential phase measurements. n : int, optional The number of range gates necessary to define the near and far range gate boundary conditions. Maesaka et al. (2012) uses a value of 30. If this value is too small then a spurious spike in specific differential phase close to the radar may be retrieved. check_outliers : bool, optional True to check for near range gate boundary condition outliers. Outliers near the radar are primarily the result of ground clutter returns. psidp_field : str, optional Field name of total differential phase. If None, the default field name must be specified in the Py-ART configuration file. debug : bool, optional True to print debugging information, False to suppress. verbose : bool, optional True to print relevant information, False to suppress. Returns ------- phi_near : ndarray The near range differential phase boundary condition for each ray. phi_far : ndarray The far range differential phase boundary condition for each ray. range_near : ndarray The near range gate in meters for each ray. range_far : ndarray The far range gate in meters for each ray. idx_near : ndarray Index of nearest range gate for each ray. idx_far : ndarray Index of furthest range gate for each ray. """ # parse field names if psidp_field is None: psidp_field = get_field_name("differential_phase") # parse differential phase measurements psidp = radar.fields[psidp_field]["data"] if gatefilter is not None: psidp = np.ma.masked_where(gatefilter.gate_excluded, psidp) # find contiguous unmasked data along each ray slices = np.ma.notmasked_contiguous(psidp, axis=1) if debug: N = sum(len(slc) for slc in slices if hasattr(slc, "__len__")) print(f"Total number of unique non-masked regions: {N}") phi_near = np.zeros(radar.nrays, dtype=psidp.dtype) phi_far = np.zeros(radar.nrays, dtype=psidp.dtype) range_near = np.zeros(radar.nrays, dtype=psidp.dtype) range_far = np.zeros(radar.nrays, dtype=psidp.dtype) idx_near = np.zeros(radar.nrays, dtype=np.int32) idx_far = np.zeros(radar.nrays, dtype=np.int32) for ray, regions in enumerate(slices): # check if all range gates are missing if regions is None: continue # check if all range gates available, i.e., no masked data if isinstance(regions, slice): regions = [regions] # near range gate boundary condition for slc in regions: # check if enough samples exist in slice if slc.stop - slc.start >= n: # parse index and range of nearest range gate idx = slice(slc.start, slc.start + n) idx_near[ray] = idx.start range_near[ray] = radar.range["data"][idx][0] # parse data for linear regression and compute slope x = radar.range["data"][idx] y = psidp[ray, idx] slope = stats.linregress(x, y)[0] # if linear regression slope is positive, set near range gate # boundary condition to first differential phase measurement, # otherwise use the median value if slope > 0.0: phi_near[ray] = y[0] else: phi_near[ray] = np.median(y) break else: continue # far range gate boundary condition for slc in reversed(regions): if slc.stop - slc.start >= n: # parse index and range of furthest range gate idx = slice(slc.stop - n, slc.stop) idx_far[ray] = idx.stop range_far[ray] = radar.range["data"][idx][-1] # parse data for linear regression and compute slope x = radar.range["data"][idx] y = psidp[ray, idx] slope = stats.linregress(x, y)[0] # if linear regression slope is positive, set far range gate # boundary condition to last differential phase measurement, # otherwise use the median value if slope > 0.0: phi_far[ray] = y[-1] else: phi_far[ray] = np.median(y) break else: continue # check for outliers in the near range boundary conditions, e.g., ground # clutter can introduce spurious values in certain rays # do not include missing values in the analysis phi_near_valid = phi_near[phi_near != 0.0] # skip the check if there are no valid values if kwargs.get("check_outliers", True) and (phi_near_valid.size != 0): # bin and count near range boundary condition values, i.e., create # a distribution of values # the default bin width is 5 deg counts, edges = np.histogram( phi_near_valid, bins=144, range=(-360, 360), density=False ) # assume that the maximum counts corresponds to the maximum in the # system phase distribution # this assumption breaks down if there are a significant amount of # near range boundary conditions characterized by ground clutter system_phase_peak_left = edges[counts.argmax()] system_phase_peak_right = edges[counts.argmax() + 1] if debug: print( f"Peak of system phase distribution: {system_phase_peak_left:.0f} deg" ) # determine left edge location of system phase distribution # we consider five counts or less to be insignificant is_left_side = np.logical_and(edges[:-1] < system_phase_peak_left, counts <= 5) left_edge = edges[:-1][is_left_side][-1] # determine right edge location of system phase distribution # we consider five counts or less to be insignificant is_right_side = np.logical_and(edges[1:] > system_phase_peak_right, counts <= 5) right_edge = edges[1:][is_right_side][0] if debug: print(f"Left edge of system phase distribution: {left_edge:.0f} deg") print(f"Right edge of system phase distribution: {right_edge:.0f} deg") # define the system phase offset as the median value of the system # phase distriubion is_system_phase = np.logical_or( phi_near_valid >= left_edge, phi_near_valid <= right_edge ) system_phase_offset = np.median(phi_near_valid[is_system_phase]) if debug: print(f"Estimated system phase offset: {system_phase_offset:.0f} deg") for ray, bc in enumerate(phi_near): # if near range boundary condition does not draw from system phase # distribution then set it to the system phase offset if bc < left_edge or bc > right_edge: phi_near[ray] = system_phase_offset # check for unphysical boundary conditions, i.e., propagation differential # phase should monotonically increase from the near boundary to the far # boundary is_unphysical = phi_far - phi_near < 0.0 phi_far[is_unphysical] = phi_near[is_unphysical] if verbose: N = is_unphysical.sum() print(f"Rays with unphysical boundary conditions: {N}") return phi_near, phi_far, range_near, range_far, idx_near, idx_far def _cost_maesaka( x, psidp_o, bcs, dhv, dr, Cobs, Clpf, finite_order, fill_value, proc, debug=False, verbose=False, ): """ Compute the value of the cost functional similar to equations (12)-(15) in Maesaka et al. (2012). Parameters ---------- x : ndarray Analysis vector containing control variable k. psidp_o : ndarray Total differential phase measurements. bcs : array_like The near and far range gate propagation differential phase boundary conditions. dhv : ndarray Backscatter differential phase. dr : float Range resolution in meters. Cobs : ndarray The differential phase measurement constraint weights. The weight should vanish where no differential phase measurements are available. Clpf : float The low-pass filter (radial smoothness) constraint weight as in equation (15) of Maesaka et al. (2012). finite_order : 'low' or 'high' The finite difference accuracy to use when computing derivatives. fill_value : float Value indicating missing or bad data in radar field data. proc : int The number of parallel threads (CPUs) to use. debug : bool, optional True to print debugging information, False to suppress. verbose : bool, optional True to print progress information, False to suppress. Returns ------- J : float Value of total cost functional. """ # parse control variable k from analysis vector nr, ng = psidp_o.shape k = x.reshape(nr, ng) # parse near and far range gate boundary conditions phi_near, phi_far = bcs # compute forward direction propagation differential phase from control # variable k phi_fa = np.zeros_like(k, subok=False) phi_fa[:, 1:] = np.cumsum(k[:, :-1] ** 2, axis=1) # compute reverse direction propagation differential phase from control # variable k phi_ra = np.zeros_like(k, subok=False) phi_ra[:, :-1] = np.cumsum(k[:, :0:-1] ** 2, axis=1)[:, ::-1] # compute forward and reverse propagation differential phase # from total differential phase observations phi_fo = psidp_o - dhv - phi_near[:, np.newaxis].repeat(ng, axis=1) phi_ro = phi_far[:, np.newaxis].repeat(ng, axis=1) - psidp_o + dhv # cost: forward direction differential phase observations Jof = 0.5 * np.sum(Cobs * (phi_fa - phi_fo) ** 2) # cost: reverse direction differential phase observations Jor = 0.5 * np.sum(Cobs * (phi_ra - phi_ro) ** 2) # prepare control variable k for Cython function k = np.ascontiguousarray(k, dtype=np.float64) # compute low-pass filter term, i.e., second order derivative of k with # respect to range d2kdr2 = np.empty_like(k) _kdp_proc.lowpass_maesaka_term(k, dr, finite_order, d2kdr2) # cost: low-pass filter, i.e., radial smoothness Jlpf = 0.5 * np.sum(Clpf * (d2kdr2) ** 2) # compute value of total cost functional J = Jof + Jor + Jlpf if verbose: print(f"Forward direction observation cost : {Jof:1.3e}") print(f"Reverse direction observation cost : {Jor:1.3e}") print(f"Low-pass filter cost ............. : {Jlpf:1.3e}") print(f"Total cost ....................... : {J:1.3e}") return J def _jac_maesaka( x, psidp_o, bcs, dhv, dr, Cobs, Clpf, finite_order, fill_value, proc, debug=False, verbose=False, ): """ Compute the Jacobian (gradient) of the cost functional similar to equations (16)-(18) in Maesaka et al. (2012). Parameters ---------- x : ndarray Analysis vector containing control variable k. psidp_o : ndarray Total differential phase measurements. bcs : array_like The near and far range gate propagation differential phase boundary conditions. dhv : ndarray Backscatter differential phase. dr : float Range resolution in meters. Cobs : ndarray The differential phase measurement constraint weights. The weight should vanish where no differential phase measurements are available. Clpf : float The low-pass filter (radial smoothness) constraint weight as in equation (15) of Maesaka et al. (2012). finite_order : 'low' or 'high' The finite difference accuracy to use when computing derivatives. fill_value : float Value indicating missing or bad data in radar field data. proc : int The number of parallel threads (CPUs) to use. debug : bool, optional True to print debugging information, False to suppress. verbose : bool, optional True to print progress information, False to suppress. Returns ------- jac : ndarray Jacobian of the cost functional. """ # parse control variable k from analysis vector nr, ng = psidp_o.shape k = x.reshape(nr, ng) # parse near and far range gate boundary conditions phi_near, phi_far = bcs # compute forward direction propagation differential phase from control # variable k phi_fa = np.zeros_like(k, subok=False) phi_fa[:, 1:] = np.cumsum(k[:, :-1] ** 2, axis=1) # compute reverse direction propagation differential phase from control # variable k phi_ra = np.zeros_like(k, subok=False) phi_ra[:, :-1] = np.cumsum(k[:, :0:-1] ** 2, axis=1)[:, ::-1] # compute forward and reverse propagation differential phase # from total differential phase observations phi_fo = psidp_o - dhv - phi_near[:, np.newaxis].repeat(ng, axis=1) phi_ro = phi_far[:, np.newaxis].repeat(ng, axis=1) - psidp_o + dhv # prepare control variable k for Cython functions k = np.ascontiguousarray(k, dtype=np.float64) # cost: forward direction differential phase observations dJofdk = np.zeros_like(k, subok=False) dJofdk[:, :-1] = ( 2.0 * k[:, :-1] * np.cumsum((Cobs[:, 1:] * (phi_fa[:, 1:] - phi_fo[:, 1:]))[:, ::-1], axis=1)[ :, ::-1 ] ) # cost: reverse direction differential phase observations dJordk = np.zeros_like(k, subok=False) dJordk[:, 1:] = ( 2.0 * k[:, 1:] * np.cumsum((Cobs[:, :-1] * (phi_ra[:, :-1] - phi_ro[:, :-1])), axis=1) ) # compute low-pass filter term, i.e., second order derivative of k with # respect to range d2kdr2 = np.empty_like(k) _kdp_proc.lowpass_maesaka_term(k, dr, finite_order, d2kdr2) # compute gradients of Jlpf with respect to the control variable k dJlpfdk = np.empty_like(d2kdr2) _kdp_proc.lowpass_maesaka_jac(d2kdr2, dr, Clpf, finite_order, dJlpfdk) # sum control variable derivative components dJdk = dJofdk + dJordk + dJlpfdk jac = dJdk.flatten() # compute the vector norm of the Jacobian if verbose: mag = np.linalg.norm(jac, ord=None, axis=None) print(f"Vector norm of Jacobian: {mag:1.3e}") return jac def _forward_reverse_phidp(k, bcs, verbose=False): """ Compute the forward and reverse direction propagation differential phases from the control variable k and boundary conditions following equations (1) and (7) in Maesaka et al. (2012). Parameters ---------- k : ndarray Control variable k of the Maesaka et al. (2012) method. The control variable k is proportional to the square root of specific differential phase. bcs : array_like The near and far range gate boundary conditions. verbose : bool, optional True to print relevant information, False to suppress. Returns ------- phidp_f : ndarray Forward direction propagation differential phase. phidp_r : ndarray Reverse direction propagation differential phase. """ # parse near and far range gate boundary conditions _, ng = k.shape phi_near, phi_far = bcs # compute forward direction propagation differential phase phi_f = np.zeros_like(k, subok=False) phi_f[:, 1:] = np.cumsum(k[:, :-1] ** 2, axis=1) phidp_f = phi_f + phi_near[:, np.newaxis].repeat(ng, axis=1) # compute reverse direction propagation differential phase phi_r = np.zeros_like(k, subok=False) phi_r[:, :-1] = np.cumsum(k[:, :0:-1] ** 2, axis=1)[:, ::-1] phidp_r = phi_far[:, np.newaxis].repeat(ng, axis=1) - phi_r # check quality of retrieval by comparing forward and reverse directions if verbose: phidp_mbe = np.ma.mean(phidp_f - phidp_r) phidp_mae = np.ma.mean(np.abs(phidp_f - phidp_r)) print(f"Forward-reverse PHIDP MBE: {phidp_mbe:.2f} deg") print(f"Forward-reverse PHIDP MAE: {phidp_mae:.2f} deg") return phidp_f, phidp_r def _parse_range_resolution(radar, check_uniform=True, atol=1.0, verbose=False): """ Parse the radar range gate resolution. Parameters ---------- radar : Radar Radar containing range data. check_uniform : bool, optional True to check if all range gates are equally spaced, and if so return a scalar value for range resolution. If False, the resolution between each range gate is returned. atol : float, optional The absolute tolerance in meters allowed for discrepancies in range gate spacings. Only applicable when check_uniform is True. This parameter may be necessary to catch instances where range gate spacings differ by a few meters or so. verbose : bool, optional True to print the range gate resolution. Only valid if check_uniform is True. Returns ------- dr : float or ndarray The radar range gate spacing in meters. """ # parse radar range gate spacings dr = np.diff(radar.range["data"], n=1) # check for uniform resolution if check_uniform and np.allclose(np.diff(dr), 0.0, atol=atol): dr = dr[0] if verbose: print(f"Range resolution: {dr:.2f} m") else: raise ValueError("Radar gate spacing is not uniform") return dr