Source code for pyart.util.xsect

"""
Function for extracting cross sections from radar volumes.

"""

from copy import copy
from warnings import warn

import numpy as np

from ..core import Radar


[docs]def cross_section_ppi(radar, target_azimuths, az_tol=None): """ Extract cross sections from a PPI volume along one or more azimuth angles. Parameters ---------- radar : Radar Radar volume containing PPI sweeps from which azimuthal cross sections will be extracted. target_azimuth : list Azimuthal angles in degrees where cross sections will be taken. az_tol : float, optional Azimuth angle tolerance in degrees. If none the nearest angle is used. If valid only angles within the tolerance distance are considered. Returns ------- radar_rhi : Radar Radar volume containing RHI sweeps which contain azimuthal cross sections from the original PPI volume. """ # determine which rays from the ppi radar make up the pseudo RHI prhi_rays = [] valid_azimuths = [] for target_azimuth in sorted(target_azimuths): for sweep_slice in radar.iter_slice(): sweep_azimuths = radar.azimuth["data"][sweep_slice] d_az = np.abs(sweep_azimuths - target_azimuth) if az_tol is None: ray_number = np.argmin(d_az) prhi_rays.append(ray_number + sweep_slice.start) valid_azimuths.append(target_azimuth) else: d_az_min = np.min(d_az) if d_az_min > az_tol: warn( "WARNING: No azimuth found whithin tolerance " + "for angle " + str(target_azimuth) + ". Minimum distance to radar azimuth " + str(d_az_min) + " larger than tolerance " + str(az_tol) ) else: ray_number = np.argmin(d_az) prhi_rays.append(ray_number + sweep_slice.start) valid_azimuths.append(target_azimuth) unique_azimuths = np.unique(valid_azimuths) rhi_nsweeps = len(unique_azimuths) if rhi_nsweeps == 0: raise ValueError("No azimuth found within tolerance") radar_rhi = _construct_xsect_radar( radar, "rhi", prhi_rays, rhi_nsweeps, unique_azimuths ) return radar_rhi
[docs]def cross_section_rhi(radar, target_elevations, el_tol=None): """ Extract cross sections from an RHI volume along one or more elevation angles. Parameters ---------- radar : Radar Radar volume containing RHI sweeps from which azimuthal cross sections will be extracted. target_elevations : list Elevation angles in degrees where cross sections will be taken. el_tol : float, optional Elevation angle tolerance in degrees. If none the nearest angle is used. If valid only angles within the tolerance distance are considered. Returns ------- radar_ppi : Radar Radar volume containing PPI sweeps which contain azimuthal cross sections from the original RHI volume. """ # determine which rays from the rhi radar make up the pseudo PPI pppi_rays = [] valid_elevations = [] for target_elevation in target_elevations: for sweep_slice in radar.iter_slice(): sweep_elevations = radar.elevation["data"][sweep_slice] d_el = np.abs(sweep_elevations - target_elevation) if el_tol is None: ray_number = np.argmin(d_el) pppi_rays.append(ray_number + sweep_slice.start) valid_elevations.append(target_elevation) else: d_el_min = np.min(d_el) if d_el_min > el_tol: warn( "WARNING: No elevation found whithin tolerance " + "for angle " + str(target_elevation) + ". Minimum distance to radar elevation " + str(d_el_min) + " larger than tolerance " + str(el_tol) ) else: ray_number = np.argmin(d_el) pppi_rays.append(ray_number + sweep_slice.start) valid_elevations.append(target_elevation) ppi_nsweeps = len(valid_elevations) if ppi_nsweeps == 0: raise ValueError("No elevation found within tolerance") radar_ppi = _construct_xsect_radar( radar, "ppi", pppi_rays, ppi_nsweeps, valid_elevations ) return radar_ppi
def _construct_xsect_radar(radar, scan_type, pxsect_rays, xsect_nsweeps, target_angles): """ Constructs a new radar object that contains cross-sections at fixed angles of a PPI or RHI volume scan. Parameters ---------- radar : Radar Radar volume containing RHI/PPI sweeps from which a cross sections will be extracted. scan_type : str Type of cross section scan (ppi or rhi). pxsect_rays : list List of rays from the radar volume to be copied in the cross-sections radar object. xsect_nsweeps : int Number of sweeps in the cross-section radar. target_angles : array The target fixed angles. Returns ------- radar_xsect : Radar Radar volume containing sweeps which contain cross sections from the original volume. """ _range = _copy_dic(radar.range) latitude = _copy_dic(radar.latitude) longitude = _copy_dic(radar.longitude) altitude = _copy_dic(radar.altitude) metadata = _copy_dic(radar.metadata) time = _copy_dic(radar.time, excluded_keys=["data"]) time["data"] = radar.time["data"][pxsect_rays].copy() azimuth = _copy_dic(radar.azimuth, excluded_keys=["data"]) azimuth["data"] = radar.azimuth["data"][pxsect_rays].copy() elevation = _copy_dic(radar.elevation, excluded_keys=["data"]) elevation["data"] = radar.elevation["data"][pxsect_rays].copy() fields = {} for field_name, orig_field_dic in radar.fields.items(): field_dic = _copy_dic(orig_field_dic, excluded_keys=["data"]) field_dic["data"] = orig_field_dic["data"][pxsect_rays].copy() fields[field_name] = field_dic sweep_number = _copy_dic(radar.sweep_number, excluded_keys=["data"]) sweep_number["data"] = np.arange(xsect_nsweeps, dtype="int32") sweep_mode = _copy_dic(radar.sweep_mode, excluded_keys=["data"]) sweep_mode["data"] = np.array([scan_type] * xsect_nsweeps) fixed_angle = _copy_dic(radar.fixed_angle, excluded_keys=["data"]) fixed_angle["data"] = np.array(target_angles, dtype="float32") sweep_start_ray_index = _copy_dic( radar.sweep_start_ray_index, excluded_keys=["data"] ) ssri = np.arange(xsect_nsweeps, dtype="int32") * radar.nsweeps sweep_start_ray_index["data"] = ssri sweep_end_ray_index = _copy_dic(radar.sweep_end_ray_index, excluded_keys=["data"]) seri = np.arange(xsect_nsweeps, dtype="int32") * radar.nsweeps + radar.nsweeps - 1 sweep_end_ray_index["data"] = seri radar_xsect = Radar( time, _range, fields, metadata, scan_type, latitude, longitude, altitude, sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index, sweep_end_ray_index, azimuth, elevation, ) return radar_xsect def _copy_dic(orig_dic, excluded_keys=None): """Return a copy of the original dictionary copying each element.""" if excluded_keys is None: excluded_keys = [] dic = {} for k, v in orig_dic.items(): if k not in excluded_keys: dic[k] = copy(v) return dic