"""
Function for extracting cross sections from radar volumes.
"""
from copy import copy
from warnings import warn
import numpy as np
from ..core import Radar
[docs]def cross_section_ppi(radar, target_azimuths, az_tol=None):
"""
Extract cross sections from a PPI volume along one or more azimuth angles.
Parameters
----------
radar : Radar
Radar volume containing PPI sweeps from which azimuthal
cross sections will be extracted.
target_azimuth : list
Azimuthal angles in degrees where cross sections will be taken.
az_tol : float, optional
Azimuth angle tolerance in degrees. If none the nearest angle is used.
If valid only angles within the tolerance distance are considered.
Returns
-------
radar_rhi : Radar
Radar volume containing RHI sweeps which contain azimuthal
cross sections from the original PPI volume.
"""
# determine which rays from the ppi radar make up the pseudo RHI
prhi_rays = []
valid_azimuths = []
for target_azimuth in sorted(target_azimuths):
for sweep_slice in radar.iter_slice():
sweep_azimuths = radar.azimuth["data"][sweep_slice]
d_az = np.abs(sweep_azimuths - target_azimuth)
if az_tol is None:
ray_number = np.argmin(d_az)
prhi_rays.append(ray_number + sweep_slice.start)
valid_azimuths.append(target_azimuth)
else:
d_az_min = np.min(d_az)
if d_az_min > az_tol:
warn(
"WARNING: No azimuth found whithin tolerance "
+ "for angle "
+ str(target_azimuth)
+ ". Minimum distance to radar azimuth "
+ str(d_az_min)
+ " larger than tolerance "
+ str(az_tol)
)
else:
ray_number = np.argmin(d_az)
prhi_rays.append(ray_number + sweep_slice.start)
valid_azimuths.append(target_azimuth)
unique_azimuths = np.unique(valid_azimuths)
rhi_nsweeps = len(unique_azimuths)
if rhi_nsweeps == 0:
raise ValueError("No azimuth found within tolerance")
radar_rhi = _construct_xsect_radar(
radar, "rhi", prhi_rays, rhi_nsweeps, unique_azimuths
)
return radar_rhi
[docs]def cross_section_rhi(radar, target_elevations, el_tol=None):
"""
Extract cross sections from an RHI volume along one or more elevation
angles.
Parameters
----------
radar : Radar
Radar volume containing RHI sweeps from which azimuthal
cross sections will be extracted.
target_elevations : list
Elevation angles in degrees where cross sections will be taken.
el_tol : float, optional
Elevation angle tolerance in degrees. If none the nearest angle is
used. If valid only angles within the tolerance distance are
considered.
Returns
-------
radar_ppi : Radar
Radar volume containing PPI sweeps which contain azimuthal
cross sections from the original RHI volume.
"""
# determine which rays from the rhi radar make up the pseudo PPI
pppi_rays = []
valid_elevations = []
for target_elevation in target_elevations:
for sweep_slice in radar.iter_slice():
sweep_elevations = radar.elevation["data"][sweep_slice]
d_el = np.abs(sweep_elevations - target_elevation)
if el_tol is None:
ray_number = np.argmin(d_el)
pppi_rays.append(ray_number + sweep_slice.start)
valid_elevations.append(target_elevation)
else:
d_el_min = np.min(d_el)
if d_el_min > el_tol:
warn(
"WARNING: No elevation found whithin tolerance "
+ "for angle "
+ str(target_elevation)
+ ". Minimum distance to radar elevation "
+ str(d_el_min)
+ " larger than tolerance "
+ str(el_tol)
)
else:
ray_number = np.argmin(d_el)
pppi_rays.append(ray_number + sweep_slice.start)
valid_elevations.append(target_elevation)
ppi_nsweeps = len(valid_elevations)
if ppi_nsweeps == 0:
raise ValueError("No elevation found within tolerance")
radar_ppi = _construct_xsect_radar(
radar, "ppi", pppi_rays, ppi_nsweeps, valid_elevations
)
return radar_ppi
def _construct_xsect_radar(radar, scan_type, pxsect_rays, xsect_nsweeps, target_angles):
"""
Constructs a new radar object that contains cross-sections at fixed angles
of a PPI or RHI volume scan.
Parameters
----------
radar : Radar
Radar volume containing RHI/PPI sweeps from which a cross sections will
be extracted.
scan_type : str
Type of cross section scan (ppi or rhi).
pxsect_rays : list
List of rays from the radar volume to be copied in the cross-sections
radar object.
xsect_nsweeps : int
Number of sweeps in the cross-section radar.
target_angles : array
The target fixed angles.
Returns
-------
radar_xsect : Radar
Radar volume containing sweeps which contain cross sections from the
original volume.
"""
_range = _copy_dic(radar.range)
latitude = _copy_dic(radar.latitude)
longitude = _copy_dic(radar.longitude)
altitude = _copy_dic(radar.altitude)
metadata = _copy_dic(radar.metadata)
time = _copy_dic(radar.time, excluded_keys=["data"])
time["data"] = radar.time["data"][pxsect_rays].copy()
azimuth = _copy_dic(radar.azimuth, excluded_keys=["data"])
azimuth["data"] = radar.azimuth["data"][pxsect_rays].copy()
elevation = _copy_dic(radar.elevation, excluded_keys=["data"])
elevation["data"] = radar.elevation["data"][pxsect_rays].copy()
fields = {}
for field_name, orig_field_dic in radar.fields.items():
field_dic = _copy_dic(orig_field_dic, excluded_keys=["data"])
field_dic["data"] = orig_field_dic["data"][pxsect_rays].copy()
fields[field_name] = field_dic
sweep_number = _copy_dic(radar.sweep_number, excluded_keys=["data"])
sweep_number["data"] = np.arange(xsect_nsweeps, dtype="int32")
sweep_mode = _copy_dic(radar.sweep_mode, excluded_keys=["data"])
sweep_mode["data"] = np.array([scan_type] * xsect_nsweeps)
fixed_angle = _copy_dic(radar.fixed_angle, excluded_keys=["data"])
fixed_angle["data"] = np.array(target_angles, dtype="float32")
sweep_start_ray_index = _copy_dic(
radar.sweep_start_ray_index, excluded_keys=["data"]
)
ssri = np.arange(xsect_nsweeps, dtype="int32") * radar.nsweeps
sweep_start_ray_index["data"] = ssri
sweep_end_ray_index = _copy_dic(radar.sweep_end_ray_index, excluded_keys=["data"])
seri = np.arange(xsect_nsweeps, dtype="int32") * radar.nsweeps + radar.nsweeps - 1
sweep_end_ray_index["data"] = seri
radar_xsect = Radar(
time,
_range,
fields,
metadata,
scan_type,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
)
return radar_xsect
def _copy_dic(orig_dic, excluded_keys=None):
"""Return a copy of the original dictionary copying each element."""
if excluded_keys is None:
excluded_keys = []
dic = {}
for k, v in orig_dic.items():
if k not in excluded_keys:
dic[k] = copy(v)
return dic