Source code for pyart.correct.unwrap

Dealias using multidimensional phase unwrapping algorithms.


import numpy as np

from ..config import get_fillvalue, get_metadata
from ._common_dealias import (
from ._unwrap_1d import unwrap_1d
from ._unwrap_2d import unwrap_2d
from ._unwrap_3d import unwrap_3d

[docs]def dealias_unwrap_phase( radar, unwrap_unit="sweep", nyquist_vel=None, check_nyquist_uniform=True, gatefilter=False, rays_wrap_around=None, keep_original=False, set_limits=True, vel_field=None, corr_vel_field=None, skip_checks=False, **kwargs, ): """ Dealias Doppler velocities using multi-dimensional phase unwrapping [1]_ and [2]_. Parameters ---------- radar : Radar Radar object containing Doppler velocities to dealias. unwrap_unit : {'ray', 'sweep', 'volume'}, optional Unit to unwrap independently. 'ray' will unwrap each ray individually, 'sweep' each sweep, and 'volume' will unwrap the entire volume in a single pass. 'sweep', the default, often gives superior results when the lower sweeps of the radar volume are contaminated by clutter. 'ray' does not use the gatefilter parameter and rays where gates ared masked will result in poor dealiasing for that ray. nyquist_velocity : array like or float, optional Nyquist velocity in unit identical to those stored in the radar's velocity field, either for each sweep or a single value which will be used for all sweeps. None will attempt to determine this value from the Radar object. The Nyquist velocity of the first sweep is used for all dealiasing unless the unwrap_unit is 'sweep' when the velocities of each sweep are used. check_nyquist_uniform : bool, optional True to check if the Nyquist velocities are uniform for all rays within a sweep, False will skip this check. This parameter is ignored when the nyquist_velocity parameter is not None. gatefilter : GateFilter, None or False, optional. A GateFilter instance which specified which gates should be ignored when performing de-aliasing. A value of None created this filter from the radar moments using any additional arguments by passing them to :py:func:`moment_based_gate_filter`. False, the default, disables filtering including all gates in the dealiasing. rays_wrap_around : bool or None, optional True when the rays at the beginning of the sweep and end of the sweep should be interpreted as connected when de-aliasing (PPI scans). False if they edges should not be interpreted as connected (other scan types). None will determine the correct value from the radar scan type. keep_original : bool, optional True to retain the original Doppler velocity values at gates where the dealiasing procedure fails or was not applied. False does not replacement and these gates will be masked in the corrected velocity field. set_limits : bool, optional True to set valid_min and valid_max elements in the returned dictionary. False will not set these dictionary elements. vel_field : str, optional Field in radar to use as the Doppler velocities during dealiasing. None will use the default field name from the Py-ART configuration file. corr_vel_field : str, optional Name to use for the dealiased Doppler velocity field metadata. None will use the default field name from the Py-ART configuration file. skip_checks : bool True to skip checks verifing that an appropiate unwrap_unit is selected, False retains these checked. Setting this parameter to True is not recommended and is only offered as an option for extreme cases. Returns ------- corr_vel : dict Field dictionary containing dealiased Doppler velocities. Dealiased array is stored under the 'data' key. References ---------- .. [1] Miguel Arevallilo Herraez, David R. Burton, Michael J. Lalor, and Munther A. Gdeisat, "Fast two-dimensional phase-unwrapping algorithm based on sorting by reliability following a noncontinuous path", Journal Applied Optics, Vol. 41, No. 35 (2002) 7437, .. [2] Abdul-Rahman, H., Gdeisat, M., Burton, D., & Lalor, M., "Fast three-dimensional phase-unwrapping algorithm based on sorting by reliability following a non-continuous path. In W. Osten, C. Gorecki, & E. L. Novak (Eds.), Optical Metrology (2005) 32--40, International Society for Optics and Photonics. """ vel_field, corr_vel_field = _parse_fields(vel_field, corr_vel_field) gatefilter = _parse_gatefilter(gatefilter, radar, **kwargs) rays_wrap_around = _parse_rays_wrap_around(rays_wrap_around, radar) nyquist_vel = _parse_nyquist_vel(nyquist_vel, radar, check_nyquist_uniform) if not skip_checks: _verify_unwrap_unit(radar, unwrap_unit) # exclude masked and invalid velocity gates gatefilter.exclude_masked(vel_field) gatefilter.exclude_invalid(vel_field) gfilter = gatefilter.gate_excluded # raw vel. data possibly with masking raw_vdata = radar.fields[vel_field]["data"] vdata = raw_vdata.view(np.ndarray) # mask removed # perform dealiasing if unwrap_unit == "ray": # 1D unwrapping does not use the gate filter nor respect # masked gates in the rays. No information from the radar object is # needed for the unfolding data = _dealias_unwrap_1d(vdata, nyquist_vel) elif unwrap_unit == "sweep": data = _dealias_unwrap_2d(radar, vdata, nyquist_vel, gfilter, rays_wrap_around) elif unwrap_unit == "volume": data = _dealias_unwrap_3d(radar, vdata, nyquist_vel, gfilter, rays_wrap_around) else: message = ( "Unknown `unwrap_unit` parameter, must be one of " "'ray', 'sweep', or 'volume'." ) raise ValueError(message) # fill_value from the velocity dictionary if present fill_value = radar.fields[vel_field].get("_FillValue", get_fillvalue()) # mask filtered gates if np.any(gfilter): data =, mask=gfilter, fill_value=fill_value) # restore original values where dealiasing not applied if keep_original: data[gfilter] = raw_vdata[gfilter] # return field dictionary containing dealiased Doppler velocities corr_vel = get_metadata(corr_vel_field) corr_vel["data"] = data corr_vel["_FillValue"] = fill_value if set_limits: # set valid_min and valid_max in corr_vel _set_limits(data, nyquist_vel, corr_vel) return corr_vel
def _dealias_unwrap_3d(radar, vdata, nyquist_vel, gfilter, rays_wrap_around): """Dealias using 3D phase unwrapping (full volume at once).""" # form cube and scale to phase units nyquist_vel = nyquist_vel[0] # must be uniform, not checked shape = (radar.nsweeps, -1, radar.ngates) scaled_cube = (np.pi * vdata / nyquist_vel).reshape(shape) filter_cube = gfilter.reshape(shape) # perform unwrapping wrapped = np.require(scaled_cube, np.float64, ["C"]) mask = np.require(filter_cube, np.uint8, ["C"]) unwrapped = np.empty_like(wrapped, dtype=np.float64, order="C") unwrap_3d(wrapped, mask, unwrapped, [False, rays_wrap_around, False]) # scale back to velocity units unwrapped_cube = unwrapped * nyquist_vel / np.pi unwrapped_volume = unwrapped_cube.reshape(-1, radar.ngates) unwrapped_volume = unwrapped_volume.astype(vdata.dtype) return unwrapped_volume def _dealias_unwrap_1d(vdata, nyquist_vel): """Dealias using 1D phase unwrapping (ray-by-ray).""" # nyquist_vel is only available sweep by sweep which has been lost at # this point. Metioned in the documentation nyquist_vel = nyquist_vel[0] data = np.empty_like(vdata) for i, ray in enumerate(vdata): # extract ray and scale to phase units scaled_ray = ray * np.pi / nyquist_vel # perform unwrapping wrapped = np.require(scaled_ray, np.float64, ["C"]) unwrapped = np.empty_like(wrapped, dtype=np.float64, order="C") unwrap_1d(wrapped, unwrapped) # scale back into velocity units and store data[i] = unwrapped * nyquist_vel / np.pi return data def _dealias_unwrap_2d(radar, vdata, nyquist_vel, gfilter, rays_wrap_around): """Dealias using 2D phase unwrapping (sweep-by-sweep).""" data = np.zeros_like(vdata) for nsweep, sweep_slice in enumerate(radar.iter_slice()): # extract sweep and scale to phase units sweep_nyquist_vel = nyquist_vel[nsweep] scaled_sweep = vdata[sweep_slice] * np.pi / sweep_nyquist_vel sweep_mask = gfilter[sweep_slice] # perform unwrapping wrapped = np.require(scaled_sweep, np.float64, ["C"]) mask = np.require(sweep_mask, np.uint8, ["C"]) unwrapped = np.empty_like(wrapped, dtype=np.float64, order="C") unwrap_2d(wrapped, mask, unwrapped, [rays_wrap_around, False]) # scale back into velocity units and store data[sweep_slice, :] = unwrapped * sweep_nyquist_vel / np.pi return data def _verify_unwrap_unit(radar, unwrap_unit): """ Verify that the radar supports the requested unwrap unit raises a ValueError if the unwrap_unit is not supported. """ if unwrap_unit == "sweep" or unwrap_unit == "volume": if _is_radar_sequential(radar) is False: mess = "rays are not sequentially ordered, must use 'ray' " "unwrap_unit." raise ValueError(mess) if unwrap_unit == "volume": if _is_radar_cubic(radar) is False: mess = "Non-cubic radar volume, 'volume' unwrap_unit invalid. " raise ValueError(mess) if _is_radar_sweep_aligned(radar) is False: mess = ( "Angle in sequential sweeps in radar volumes are not " "aligned, 'volume unwrap_unit invalid." ) raise ValueError(mess) def _is_radar_cubic(radar): """Test if a radar is cubic (sweeps have the same number of rays).""" rays_per_sweep = radar.rays_per_sweep["data"] return bool(np.all(rays_per_sweep == rays_per_sweep[0])) def _is_radar_sweep_aligned(radar, diff=0.1): """ Test that all sweeps in the radar sample nearly the same angles. Test that the maximum difference in sweep sampled angles is below `diff` degrees. The radar should first be tested to verify that is cubic before calling this function using the _is_radar_cubic function. """ if radar.nsweeps == 1: return True # all single sweep volume are sweep aligned if radar.scan_type == "ppi": angles = radar.azimuth["data"] elif radar.scan_type == "rhi": angles = radar.elevation["data"] else: raise ValueError(f"invalid scan_type: {radar.scan_type}") starts = radar.sweep_start_ray_index["data"] ends = radar.sweep_end_ray_index["data"] ref_angles = angles[starts[0] : ends[0] + 1] for start, end in zip(starts, ends): test_angles = angles[start : end + 1] if np.any(np.abs(test_angles - ref_angles) > diff): return False return True def _is_radar_sequential(radar): """Test if all sweeps in radar are sequentially ordered.""" for i in range(radar.nsweeps): if not _is_sweep_sequential(radar, i): return False return True def _is_sweep_sequential(radar, sweep_number): """Test if a specific sweep is sequentially ordered.""" start = radar.sweep_start_ray_index["data"][sweep_number] end = radar.sweep_end_ray_index["data"][sweep_number] if radar.scan_type == "ppi": angles = radar.azimuth["data"][start : end + 1] elif radar.scan_type == "rhi": angles = radar.elevation["data"][start : end + 1] elif radar.scan_type == "vpt": # for VPT scan time should not run backwards, so time is the # equivalent variable to an angle. angles = radar.time["data"] else: raise ValueError(f"invalid scan_type: {radar.scan_type}") rolled_angles = np.roll(angles, -np.argmin(angles)) return np.all(np.diff(rolled_angles) >= 0)