Source code for pyart.correct.region_dealias

Region based dealiasing using a dynamic network reduction for region joining.


import warnings

import numpy as np
import scipy.ndimage as ndimage
from scipy.optimize import fmin_l_bfgs_b

from ..config import get_fillvalue, get_metadata
from ._common_dealias import (
from ._fast_edge_finder import _fast_edge_finder

# Possible future improvements to the region based dealiasing algorithm:
# * Find connected regions in the entire volume (3D) an dealias the entire
#   volume as apposed to the current sweep-by-sweep approach.
# * Scale the Doppler velocities by the Nyquist interval so that
#   calculations are done in units of percentage of the Nyquist interval.
#   This scaling is already done in the _EdgeTracker object but could also be
#   applied before finding regions of similar velocities.
# * Currently no attmpt is made to unfold gates which are included in the
#   gatefilter.  One could add a second pass to the algorithms where gates
#   excluded by the filter are unfolded using a more elemenary approach.  In
#   this manner the filter could define only "good" gates which establish the
#   folding pattern which is then applied to all gates.
# * Improve perfornace by implementing a priority queue in the _EdgeTracker
#   object. See comments in the class for details.

[docs]def dealias_region_based( radar, ref_vel_field=None, interval_splits=3, interval_limits=None, skip_between_rays=100, skip_along_ray=100, centered=True, nyquist_vel=None, check_nyquist_uniform=True, gatefilter=False, rays_wrap_around=None, keep_original=False, set_limits=True, vel_field=None, corr_vel_field=None, **kwargs ): """ Dealias Doppler velocities using a region based algorithm. Performs Doppler velocity dealiasing by finding regions of similar velocities and unfolding and merging pairs of regions until all regions are unfolded. Unfolding and merging regions is accomplished by modeling the problem as a dynamic network reduction. Parameters ---------- radar : Radar Radar object containing Doppler velocities to dealias. ref_vel_field : str or None, optional Field in radar containing a reference velocity field used to anchor the unfolded velocities once the algorithm completes. Typically this field is created by simulating the radial velocities from wind data from an atmospheric sonding using :py:func:`pyart.util.simulated_vel_from_profile`. interval_splits : int, optional Number of segments to split the nyquist interval into when finding regions of similar velocity. More splits creates a larger number of initial regions which takes longer to process but may result in better dealiasing. The default value of 3 seems to be a good compromise between performance and artifact free dealiasing. This value is not used if the interval_limits parameter is not None. interval_limits : array like or None, optional Velocity limits used for finding regions of similar velocity. Should cover the entire nyquist interval. None, the default value, will split the Nyquist interval into interval_splits equal sized intervals. skip_between_rays, skip_along_ray : int, optional Maximum number of filtered gates to skip over when joining regions, gaps between region larger than this will not be connected. Parameters specify the maximum number of filtered gates between and along a ray. Set these parameters to 0 to disable unfolding across filtered gates. centered : bool, optional True to apply centering to each sweep after the dealiasing algorithm so that the average number of unfolding is near 0. False does not apply centering which may results in individual sweeps under or over folded by the nyquist interval. nyquist_velocity : array like or float, optional Nyquist velocity in unit identical to those stored in the radar's velocity field, either for each sweep or a single value which will be used for all sweeps. None will attempt to determine this value from the Radar object. check_nyquist_uniform : bool, optional True to check if the Nyquist velocities are uniform for all rays within a sweep, False will skip this check. This parameter is ignored when the nyquist_velocity parameter is not None. gatefilter : GateFilter, None or False, optional. A GateFilter instance which specified which gates should be ignored when performing de-aliasing. A value of None created this filter from the radar moments using any additional arguments by passing them to :py:func:`moment_based_gate_filter`. False, the default, disables filtering including all gates in the dealiasing. rays_wrap_around : bool or None, optional True when the rays at the beginning of the sweep and end of the sweep should be interpreted as connected when de-aliasing (PPI scans). False if they edges should not be interpreted as connected (other scan types). None will determine the correct value from the radar scan type. keep_original : bool, optional True to retain the original Doppler velocity values at gates where the dealiasing procedure fails or was not applied. False does not replacement and these gates will be masked in the corrected velocity field. set_limits : bool, optional True to set valid_min and valid_max elements in the returned dictionary. False will not set these dictionary elements. vel_field : str, optional Field in radar to use as the Doppler velocities during dealiasing. None will use the default field name from the Py-ART configuration file. corr_vel_field : str, optional Name to use for the dealiased Doppler velocity field metadata. None will use the default field name from the Py-ART configuration file. Returns ------- corr_vel : dict Field dictionary containing dealiased Doppler velocities. Dealiased array is stored under the 'data' key. """ # parse function parameters vel_field, corr_vel_field = _parse_fields(vel_field, corr_vel_field) gatefilter = _parse_gatefilter(gatefilter, radar, **kwargs) rays_wrap_around = _parse_rays_wrap_around(rays_wrap_around, radar) nyquist_vel = _parse_nyquist_vel(nyquist_vel, radar, check_nyquist_uniform) # parse ref_vel_field if ref_vel_field is None: ref_vdata = None else: ref_vdata = radar.fields[ref_vel_field]["data"] # exclude masked and invalid velocity gates gatefilter.exclude_masked(vel_field) gatefilter.exclude_invalid(vel_field) gfilter = gatefilter.gate_excluded # perform dealiasing vdata = radar.fields[vel_field]["data"].view(np.ndarray) data = vdata.copy() # dealiased velocities # loop over sweeps for nsweep, sweep_slice in enumerate(radar.iter_slice()): # extract sweep data sdata = vdata[sweep_slice].copy() # is a copy needed here? scorr = data[sweep_slice] sfilter = gfilter[sweep_slice] # find nyquist velocity and interval segmentation limits nyquist_interval = nyquist_vel[nsweep] * 2.0 if interval_limits is None: nvel = nyquist_vel[nsweep] valid_sdata = sdata[~sfilter] s_interval_limits = _find_sweep_interval_splits( nvel, interval_splits, valid_sdata, nsweep ) else: s_interval_limits = interval_limits # find regions in original data labels, nfeatures = _find_regions(sdata, sfilter, s_interval_limits) # skip sweep if all gates are masked or only a single region if nfeatures < 2: continue bincount = np.bincount(labels.ravel()) num_masked_gates = bincount[0] region_sizes = bincount[1:] # find all edges between regions indices, edge_count, velos = _edge_sum_and_count( labels, num_masked_gates, sdata, rays_wrap_around, skip_between_rays, skip_along_ray, ) # no unfolding required if no edges exist between regions if len(edge_count) == 0: continue # find the number of folds in the regions region_tracker = _RegionTracker(region_sizes) edge_tracker = _EdgeTracker( indices, edge_count, velos, nyquist_interval, nfeatures + 1 ) while True: if _combine_regions(region_tracker, edge_tracker): break # center sweep if requested, determine a global sweep unfold number # so that the average number of gate folds is zero. if centered: gates_dealiased = region_sizes.sum() total_folds = np.sum(region_sizes * region_tracker.unwrap_number[1:]) sweep_offset = int(round(float(total_folds) / gates_dealiased)) if sweep_offset != 0: region_tracker.unwrap_number -= sweep_offset # dealias the data using the fold numbers nwrap = np.take(region_tracker.unwrap_number, labels) scorr += nwrap * nyquist_interval # anchor unfolded velocities against reference velocity if ref_vdata is not None: sref = ref_vdata[sweep_slice] gfold = (sref - scorr).mean() / nyquist_interval gfold = round(gfold) # Anchor specific regions against reference velocity # Do this by constraining cost function due to difference # from reference velocity and to 2D continuity new_interval_limits = np.linspace(scorr.min(), scorr.max(), 10) labels_corr, nfeatures_corr = _find_regions( scorr, sfilter, new_interval_limits ) # If we only have one region, just adjust the whole sweep by # x nyquist intervals if nfeatures_corr < 2: scorr = scorr + gfold * nyquist_interval else: bounds_list = [ (x, y) for (x, y) in zip( -6 * np.ones(nfeatures_corr), 5 * np.ones(nfeatures_corr) ) ] scorr_means = np.zeros(nfeatures_corr) sref_means = np.zeros(nfeatures_corr) for reg in range(1, nfeatures_corr + 1): scorr_means[reg - 1] =[labels_corr == reg]) sref_means[reg - 1] =[labels_corr == reg]) def cost_function(x): return _cost_function( x, scorr_means, sref_means, nyquist_interval, nfeatures_corr ) def gradient(x): return _gradient( x, scorr_means, sref_means, nyquist_interval, nfeatures_corr ) nyq_adjustments = fmin_l_bfgs_b( cost_function, gfold * np.ones(nfeatures_corr), disp=True, fprime=gradient, bounds=bounds_list, maxiter=200, pgtol=nyquist_interval, ) i = 0 for reg in range(1, nfeatures_corr): scorr[labels == reg] += nyquist_interval * np.round( nyq_adjustments[0][i] ) i = i + 1 # fill_value from the velocity dictionary if present fill_value = radar.fields[vel_field].get("_FillValue", get_fillvalue()) # mask filtered gates if np.any(gfilter): data =, mask=gfilter, fill_value=fill_value) # restore original values where dealiasing not applied if keep_original: data[gfilter] = radar.fields[vel_field]["data"][gfilter] # return field dictionary containing dealiased Doppler velocities corr_vel = get_metadata(corr_vel_field) corr_vel["data"] = data corr_vel["_FillValue"] = fill_value if set_limits: # set valid_min and valid_max in corr_vel _set_limits(data, nyquist_vel, corr_vel) return corr_vel
def _find_sweep_interval_splits(nyquist, interval_splits, velocities, nsweep): """Return the interval limits for a given sweep.""" # The Nyquist interval is split into interval_splits equal sized areas. # If velocities outside the Nyquist are present the number and # limits of the interval splits must be adjusted so that theses # velocities are included in one of the splits. add_start = add_end = 0 interval = (2.0 * nyquist) / (interval_splits) # no change from default if all gates filtered if len(velocities) != 0: max_vel = velocities.max() min_vel = velocities.min() if max_vel > nyquist or min_vel < -nyquist: msg = "Velocities outside of the Nyquist interval found in " "sweep %i." % ( nsweep ) warnings.warn(msg, UserWarning) # additional intervals must be added to capture the velocities # outside the nyquist limits add_start = int(np.ceil((max_vel - nyquist) / (interval))) add_end = int(np.ceil(-(min_vel + nyquist) / (interval))) start = -nyquist - add_start * interval end = nyquist + add_end * interval num = interval_splits + 1 + add_start + add_end return np.linspace(start, end, num, endpoint=True) def _find_regions(vel, gfilter, limits): """ Find regions of similar velocity. For each pair of values in the limits array (or list) find all connected velocity regions within these limits. Parameters ---------- vel : 2D ndarray Array containing velocity data for a single sweep. gfilter : 2D ndarray Filter indicating if a particular gate should be masked. True indicates the gate should be masked (excluded). limits : array like Velocity limits for region finding. For each pair of limits, taken from elements i and i+1 of the array, all connected regions with velocities within these limits will be found. Returns ------- label : ndarray Interger array with each region labeled by a value. The array ranges from 0 to nfeatures, inclusive, where a value of 0 indicates masked gates and non-zero indicates a region of connected gates. nfeatures : int Number of regions found. """ mask = ~gfilter label = np.zeros(vel.shape, dtype=np.int32) nfeatures = 0 for lmin, lmax in zip(limits[:-1], limits[1:]): # find connected regions within the limits inp = (lmin <= vel) & (vel < lmax) & mask limit_label, limit_nfeatures = ndimage.label(inp) # add these regions to the global regions limit_label[np.nonzero(limit_label)] += nfeatures label += limit_label nfeatures += limit_nfeatures return label, nfeatures def _edge_sum_and_count( labels, num_masked_gates, data, rays_wrap_around, max_gap_x, max_gap_y ): """ Find all edges between labels regions. Returns the indices, count and velocities of all edges. """ total_nodes = labels.shape[0] * labels.shape[1] - num_masked_gates if rays_wrap_around: total_nodes += labels.shape[0] * 2 indices, velocities = _fast_edge_finder( labels.astype("int32"), data.astype("float32"), rays_wrap_around, max_gap_x, max_gap_y, total_nodes, ) index1, index2 = indices vel1, vel2 = velocities count = np.ones_like(vel1, dtype=np.int32) # return early if not edges were found if len(vel1) == 0: return ([], []), [], ([], []) # find the unique edges, procedure based on method in # scipy.sparse.coo_matrix.sum_duplicates # except we have three data arrays, vel1, vel2, and count order = np.lexsort((index1, index2)) index1 = index1[order] index2 = index2[order] vel1 = vel1[order] vel2 = vel2[order] count = count[order] unique_mask = (index1[1:] != index1[:-1]) | (index2[1:] != index2[:-1]) unique_mask = np.append(True, unique_mask) index1 = index1[unique_mask] index2 = index2[unique_mask] (unique_inds,) = np.nonzero(unique_mask) vel1 = np.add.reduceat(vel1, unique_inds, dtype=vel1.dtype) vel2 = np.add.reduceat(vel2, unique_inds, dtype=vel2.dtype) count = np.add.reduceat(count, unique_inds, dtype=count.dtype) return (index1, index2), count, (vel1, vel2) def _combine_regions(region_tracker, edge_tracker): """Returns True when done.""" # Edge parameters from edge with largest weight status, extra = edge_tracker.pop_edge() if status: return True node1, node2, weight, diff, edge_number = extra rdiff = int(np.round(diff)) # node sizes of nodes to be merged node1_size = region_tracker.get_node_size(node1) node2_size = region_tracker.get_node_size(node2) # determine which nodes should be merged if node1_size > node2_size: base_node, merge_node = node1, node2 else: base_node, merge_node = node2, node1 rdiff = -rdiff # unwrap merge_node if rdiff != 0: region_tracker.unwrap_node(merge_node, rdiff) edge_tracker.unwrap_node(merge_node, rdiff) # merge nodes region_tracker.merge_nodes(base_node, merge_node) edge_tracker.merge_nodes(base_node, merge_node, edge_number) return False # Minimize cost function that is sum of difference between regions and # sounding def _cost_function( nyq_vector, vels_slice_means, svels_slice_means, v_nyq_vel, nfeatures ): """Cost function for minimization in region based algorithm.""" cost = 0 i = 0 for reg in range(nfeatures): add_value = 0 # Deviance from sounding add_value = ( vels_slice_means[reg] + np.round(nyq_vector[i]) * v_nyq_vel - svels_slice_means[reg] ) ** 2 if np.isfinite(add_value): cost += add_value i = i + 1 return cost def _gradient(nyq_vector, vels_slice_means, svels_slice_means, v_nyq_vel, nfeatures): """Gradient of cost function for minimization in region based algorithm.""" gradient_vector = np.zeros(len(nyq_vector)) i = 0 for reg in range(nfeatures): add_value = ( vels_slice_means[reg] + np.round(nyq_vector[i]) * v_nyq_vel - svels_slice_means[reg] ) if np.isfinite(add_value): gradient_vector[i] = 2 * add_value * v_nyq_vel # Regional continuity vels_without_cur = np.delete(vels_slice_means, reg) diffs = np.square(vels_slice_means[reg] - vels_without_cur) if len(diffs) > 0: the_min = np.argmin(diffs) else: the_min = 0 if the_min < v_nyq_vel: gradient_vector[i] = 0 i = i + 1 return gradient_vector class _RegionTracker: """ Tracks the location of radar volume regions contained in each node as the network is reduced. """ def __init__(self, region_sizes): """initalize.""" # number of gates in each node nregions = len(region_sizes) + 1 self.node_size = np.zeros(nregions, dtype="int32") self.node_size[1:] = region_sizes[:] # array of lists containing the regions in each node self.regions_in_node = np.zeros(nregions, dtype="object") for i in range(nregions): self.regions_in_node[i] = [i] # number of unwrappings to apply to dealias each region self.unwrap_number = np.zeros(nregions, dtype="int32") def merge_nodes(self, node_a, node_b): """Merge node b into node a.""" # move all regions from node_b to node_a regions_to_merge = self.regions_in_node[node_b] self.regions_in_node[node_a].extend(regions_to_merge) self.regions_in_node[node_b] = [] # update node sizes self.node_size[node_a] += self.node_size[node_b] self.node_size[node_b] = 0 return def unwrap_node(self, node, nwrap): """Unwrap all gates contained a node.""" if nwrap == 0: return # for each region in node add nwrap regions_to_unwrap = self.regions_in_node[node] self.unwrap_number[regions_to_unwrap] += nwrap return def get_node_size(self, node): """Return the number of gates in a node.""" return self.node_size[node] class _EdgeTracker: """A class for tracking edges in a dynamic network.""" def __init__(self, indices, edge_count, velocities, nyquist_interval, nnodes): """initialize""" nedges = int(len(indices[0]) / 2) # node number and different in sum for each edge self.node_alpha = np.zeros(nedges, dtype=np.int32) self.node_beta = np.zeros(nedges, dtype=np.int32) self.sum_diff = np.zeros(nedges, dtype=np.float32) # number of connections between the regions self.weight = np.zeros(nedges, dtype=np.int32) # fast finding self._common_finder = np.zeros(nnodes, dtype=np.bool_) self._common_index = np.zeros(nnodes, dtype=np.int32) self._last_base_node = -1 # array of linked lists pointing to each node self.edges_in_node = np.zeros(nnodes, dtype="object") for i in range(nnodes): self.edges_in_node[i] = [] # fill out data from the provides indicies, edge counts and velocities edge = 0 idx1, idx2 = indices vel1, vel2 = velocities for i, j, count, vel, nvel in zip(idx1, idx2, edge_count, vel1, vel2): if i < j: continue self.node_alpha[edge] = i self.node_beta[edge] = j self.sum_diff[edge] = (vel - nvel) / nyquist_interval self.weight[edge] = count self.edges_in_node[i].append(edge) self.edges_in_node[j].append(edge) edge += 1 # list which orders edges according to their weight, highest first # TODO self.priority_queue = [] def merge_nodes(self, base_node, merge_node, foo_edge): """Merge nodes.""" # remove edge between base and merge nodes self.weight[foo_edge] = -999 self.edges_in_node[merge_node].remove(foo_edge) self.edges_in_node[base_node].remove(foo_edge) self._common_finder[merge_node] = False # find all the edges in the two nodes edges_in_merge = list(self.edges_in_node[merge_node]) # loop over base_node edges if last base_node was different if self._last_base_node != base_node: self._common_finder[:] = False edges_in_base = list(self.edges_in_node[base_node]) for edge_num in edges_in_base: # reverse edge if needed so node_alpha is base_node if self.node_beta[edge_num] == base_node: self._reverse_edge_direction(edge_num) assert self.node_alpha[edge_num] == base_node # find all neighboring nodes to base_node neighbor = self.node_beta[edge_num] self._common_finder[neighbor] = True self._common_index[neighbor] = edge_num # loop over edge nodes for edge_num in edges_in_merge: # reverse edge so that node alpha is the merge_node if self.node_beta[edge_num] == merge_node: self._reverse_edge_direction(edge_num) assert self.node_alpha[edge_num] == merge_node # update all the edges to point to the base node self.node_alpha[edge_num] = base_node # if base_node also has an edge with the neighbor combine them neighbor = self.node_beta[edge_num] if self._common_finder[neighbor]: base_edge_num = self._common_index[neighbor] self._combine_edges(base_edge_num, edge_num, merge_node, neighbor) # if not fill in _common_ arrays. else: self._common_finder[neighbor] = True self._common_index[neighbor] = edge_num # move all edges from merge_node to base_node edges = self.edges_in_node[merge_node] self.edges_in_node[base_node].extend(edges) self.edges_in_node[merge_node] = [] self._last_base_node = int(base_node) return def _combine_edges(self, base_edge, merge_edge, merge_node, neighbor_node): """Combine edges into a single edge.""" # Merging nodes MUST be set to alpha prior to calling this function # combine edge weights self.weight[base_edge] += self.weight[merge_edge] self.weight[merge_edge] = -999.0 # combine sums self.sum_diff[base_edge] += self.sum_diff[merge_edge] # remove merge_edge from both node lists self.edges_in_node[merge_node].remove(merge_edge) self.edges_in_node[neighbor_node].remove(merge_edge) # TODO priority queue # remove merge_edge from edge priority queue # self.priority_queue.remove(merge_edge) # relocate base_edge in priority queue # self.priority_queue.sort() def _reverse_edge_direction(self, edge): """Reverse an edges direction, change alpha and beta.""" # swap nodes old_alpha = int(self.node_alpha[edge]) old_beta = int(self.node_beta[edge]) self.node_alpha[edge] = old_beta self.node_beta[edge] = old_alpha # swap sums self.sum_diff[edge] = -1.0 * self.sum_diff[edge] return def unwrap_node(self, node, nwrap): """Unwrap a node.""" if nwrap == 0: return # add weight * nwrap to each edge in node for edge in self.edges_in_node[node]: weight = self.weight[edge] if node == self.node_alpha[edge]: self.sum_diff[edge] += weight * nwrap else: assert self.node_beta[edge] == node self.sum_diff[edge] += -weight * nwrap return def pop_edge(self): """Pop edge with largest weight. Return node numbers and diff.""" # if len(priority_queue) == 0: # return True, None # edge_num = self.priority_queue[0] edge_num = np.argmax(self.weight) node1 = self.node_alpha[edge_num] node2 = self.node_beta[edge_num] weight = self.weight[edge_num] diff = self.sum_diff[edge_num] / (float(weight)) if weight < 0: return True, None return False, (node1, node2, weight, diff, edge_num)