"""
A general central radial scanning (or dwelling) instrument class.
"""
import copy
import sys
import numpy as np
from ..config import get_metadata
from ..lazydict import LazyLoadDict
from .transforms import antenna_vectors_to_cartesian, cartesian_to_geographic
[docs]class Radar:
"""
A class for storing antenna coordinate radar data.
The structure of the Radar class is based on the CF/Radial Data file
format. Global attributes and variables (section 4.1 and 4.3) are
represented as a dictionary in the metadata attribute. Other required and
optional variables are represented as dictionaries in a attribute with the
same name as the variable in the CF/Radial standard. When a optional
attribute not present the attribute has a value of None. The data for a
given variable is stored in the dictionary under the 'data' key. Moment
field data is stored as a dictionary of dictionaries in the fields
attribute. Sub-convention variables are stored as a dictionary of
dictionaries under the meta_group attribute.
Refer to the attribute section for information on the parameters.
Attributes
----------
time : dict
Time at the center of each ray.
range : dict
Range to the center of each gate (bin).
fields : dict of dicts
Moment fields.
metadata : dict
Metadata describing the instrument and data.
scan_type : str
Type of scan, one of 'ppi', 'rhi', 'sector' or 'other'. If the scan
volume contains multiple sweep modes this should be 'other'.
latitude : dict
Latitude of the instrument.
longitude : dict
Longitude of the instrument.
altitude : dict
Altitude of the instrument, above sea level.
altitude_agl : dict or None
Altitude of the instrument above ground level. If not provided this
attribute is set to None, indicating this parameter not available.
sweep_number : dict
The number of the sweep in the volume scan, 0-based.
sweep_mode : dict
Sweep mode for each mode in the volume scan.
fixed_angle : dict
Target angle for thr sweep. Azimuth angle in RHI modes, elevation
angle in all other modes.
sweep_start_ray_index : dict
Index of the first ray in each sweep relative to the start of the
volume, 0-based.
sweep_end_ray_index : dict
Index of the last ray in each sweep relative to the start of the
volume, 0-based.
rays_per_sweep : LazyLoadDict
Number of rays in each sweep. The data key of this attribute is
create upon first access from the data in the sweep_start_ray_index and
sweep_end_ray_index attributes. If the sweep locations needs to be
modified, do this prior to accessing this attribute or use
:py:func:`init_rays_per_sweep` to reset the attribute.
target_scan_rate : dict or None
Intended scan rate for each sweep. If not provided this attribute is
set to None, indicating this parameter is not available.
rays_are_indexed : dict or None
Indication of whether ray angles are indexed to a regular grid in
each sweep. If not provided this attribute is set to None, indicating
ray angle spacing is not determined.
ray_angle_res : dict or None
If rays_are_indexed is not None, this provides the angular resolution
of the grid. If not provided or available this attribute is set to
None.
azimuth : dict
Azimuth of antenna, relative to true North. Azimuth angles are
recommended to be expressed in the range of [0, 360], but other
representations are not forbidden.
elevation : dict
Elevation of antenna, relative to the horizontal plane. Elevation
angles are recommended to be expressed in the range of [-180, 180],
but other representations are not forbidden.
gate_x, gate_y, gate_z : LazyLoadDict
Location of each gate in a Cartesian coordinate system assuming a
standard atmosphere with a 4/3 Earth's radius model. The data keys of
these attributes are create upon first access from the data in the
range, azimuth and elevation attributes. If these attributes are
changed use :py:func:`init_gate_x_y_z` to reset.
gate_longitude, gate_latitude : LazyLoadDict
Geographic location of each gate. The projection parameter(s) defined
in the `projection` attribute are used to perform an inverse map
projection from the Cartesian gate locations relative to the radar
location to longitudes and latitudes. If these attributes are changed
use :py:func:`init_gate_longitude_latitude` to reset the attributes.
projection : dic or str
Projection parameters defining the map projection used to transform
from Cartesian to geographic coordinates. The default dictionary sets
the 'proj' key to 'pyart_aeqd' indicating that the native Py-ART
azimuthal equidistant projection is used. This can be modified to
specify a valid pyproj.Proj projparams dictionary or string.
The special key '_include_lon_0_lat_0' is removed when interpreting
this dictionary. If this key is present and set to True, which is
required when proj='pyart_aeqd', then the radar longitude and
latitude will be added to the dictionary as 'lon_0' and 'lat_0'.
gate_altitude : LazyLoadDict
The altitude of each radar gate as calculated from the altitude of the
radar and the Cartesian z location of each gate. If this attribute
is changed use :py:func:`init_gate_altitude` to reset the attribute.
scan_rate : dict or None
Actual antenna scan rate. If not provided this attribute is set to
None, indicating this parameter is not available.
antenna_transition : dict or None
Flag indicating if the antenna is in transition, 1 = yes, 0 = no.
If not provided this attribute is set to None, indicating this
parameter is not available.
rotation : dict or None
The rotation angle of the antenna. The angle about the aircraft
longitudinal axis for a vertically scanning radar.
tilt : dict or None
The tilt angle with respect to the plane orthogonal (Z-axis) to
aircraft longitudinal axis.
roll : dict or None
The roll angle of platform, for aircraft right wing down is positive.
drift : dict or None
Drift angle of antenna, the angle between heading and track.
heading : dict or None
Heading (compass) angle, clockwise from north.
pitch : dict or None
Pitch angle of antenna, for aircraft nose up is positive.
georefs_applied : dict or None
Indicates whether the variables have had georeference calculation
applied. Leading to Earth-centric azimuth and elevation angles.
instrument_parameters : dict of dicts or None
Instrument parameters, if not provided this attribute is set to None,
indicating these parameters are not avaiable. This dictionary also
includes variables in the radar_parameters CF/Radial subconvention.
radar_calibration : dict of dicts or None
Instrument calibration parameters. If not provided this attribute is
set to None, indicating these parameters are not available
ngates : int
Number of gates (bins) in a ray.
nrays : int
Number of rays in the volume.
nsweeps : int
Number of sweep in the volume.
"""
def __init__(
self,
time,
_range,
fields,
metadata,
scan_type,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
altitude_agl=None,
target_scan_rate=None,
rays_are_indexed=None,
ray_angle_res=None,
scan_rate=None,
antenna_transition=None,
instrument_parameters=None,
radar_calibration=None,
rotation=None,
tilt=None,
roll=None,
drift=None,
heading=None,
pitch=None,
georefs_applied=None,
):
if "calendar" not in time:
time["calendar"] = "gregorian"
self.time = time
self.range = _range
self.fields = fields
self.metadata = metadata
self.scan_type = scan_type
self.latitude = latitude
self.longitude = longitude
self.altitude = altitude
self.altitude_agl = altitude_agl # optional
self.sweep_number = sweep_number
self.sweep_mode = sweep_mode
self.fixed_angle = fixed_angle
self.sweep_start_ray_index = sweep_start_ray_index
self.sweep_end_ray_index = sweep_end_ray_index
self.target_scan_rate = target_scan_rate # optional
self.rays_are_indexed = rays_are_indexed # optional
self.ray_angle_res = ray_angle_res # optional
self.azimuth = azimuth
self.elevation = elevation
self.scan_rate = scan_rate # optional
self.antenna_transition = antenna_transition # optional
self.rotation = rotation # optional
self.tilt = tilt # optional
self.roll = roll # optional
self.drift = drift # optional
self.heading = heading # optional
self.pitch = pitch # optional
self.georefs_applied = georefs_applied # optional
self.instrument_parameters = instrument_parameters # optional
self.radar_calibration = radar_calibration # optional
self.ngates = len(_range["data"])
self.nrays = len(time["data"])
self.nsweeps = len(sweep_number["data"])
self.projection = {"proj": "pyart_aeqd", "_include_lon_0_lat_0": True}
# initalize attributes with lazy load dictionaries
self.init_rays_per_sweep()
self.init_gate_x_y_z()
self.init_gate_longitude_latitude()
self.init_gate_altitude()
def __getstate__(self):
"""Return object's state which can be pickled."""
state = self.__dict__.copy() # copy the objects state
# Remove unpicklable entries (those which are lazily loaded
del state["rays_per_sweep"]
del state["gate_x"]
del state["gate_y"]
del state["gate_z"]
del state["gate_longitude"]
del state["gate_latitude"]
del state["gate_altitude"]
return state
def __setstate__(self, state):
"""Restore unpicklable entries from pickled object."""
self.__dict__.update(state)
self.init_rays_per_sweep()
self.init_gate_x_y_z()
self.init_gate_longitude_latitude()
self.init_gate_altitude()
# Attribute init/reset method
[docs] def init_rays_per_sweep(self):
"""Initialize or reset the rays_per_sweep attribute."""
lazydic = LazyLoadDict(get_metadata("rays_per_sweep"))
lazydic.set_lazy("data", _rays_per_sweep_data_factory(self))
self.rays_per_sweep = lazydic
[docs] def init_gate_x_y_z(self):
"""Initialize or reset the gate_{x, y, z} attributes."""
gate_x = LazyLoadDict(get_metadata("gate_x"))
gate_x.set_lazy("data", _gate_data_factory(self, 0))
self.gate_x = gate_x
gate_y = LazyLoadDict(get_metadata("gate_y"))
gate_y.set_lazy("data", _gate_data_factory(self, 1))
self.gate_y = gate_y
gate_z = LazyLoadDict(get_metadata("gate_z"))
gate_z.set_lazy("data", _gate_data_factory(self, 2))
self.gate_z = gate_z
[docs] def init_gate_longitude_latitude(self):
"""
Initialize or reset the gate_longitude and gate_latitude attributes.
"""
gate_longitude = LazyLoadDict(get_metadata("gate_longitude"))
gate_longitude.set_lazy("data", _gate_lon_lat_data_factory(self, 0))
self.gate_longitude = gate_longitude
gate_latitude = LazyLoadDict(get_metadata("gate_latitude"))
gate_latitude.set_lazy("data", _gate_lon_lat_data_factory(self, 1))
self.gate_latitude = gate_latitude
[docs] def init_gate_altitude(self):
"""Initialize the gate_altitude attribute."""
gate_altitude = LazyLoadDict(get_metadata("gate_altitude"))
gate_altitude.set_lazy("data", _gate_altitude_data_factory(self))
self.gate_altitude = gate_altitude
# private functions for checking limits, etc.
def _check_sweep_in_range(self, sweep):
"""Check that a sweep number is in range."""
if sweep < 0 or sweep >= self.nsweeps:
raise IndexError("Sweep out of range: ", sweep)
return
# public check functions
[docs] def check_field_exists(self, field_name):
"""
Check that a field exists in the fields dictionary.
If the field does not exist raise a KeyError.
Parameters
----------
field_name : str
Name of field to check.
"""
if field_name not in self.fields:
raise KeyError("Field not available: " + field_name)
return
# Iterators
[docs] def iter_start(self):
"""Return an iterator over the sweep start indices."""
return (s for s in self.sweep_start_ray_index["data"])
[docs] def iter_end(self):
"""Return an iterator over the sweep end indices."""
return (s for s in self.sweep_end_ray_index["data"])
[docs] def iter_start_end(self):
"""Return an iterator over the sweep start and end indices."""
return ((s, e) for s, e in zip(self.iter_start(), self.iter_end()))
[docs] def iter_slice(self):
"""Return an iterator which returns sweep slice objects."""
return (slice(s, e + 1) for s, e in self.iter_start_end())
[docs] def iter_field(self, field_name):
"""Return an iterator which returns sweep field data."""
self.check_field_exists(field_name)
return (self.fields[field_name]["data"][s] for s in self.iter_slice())
[docs] def iter_azimuth(self):
"""Return an iterator which returns sweep azimuth data."""
return (self.azimuth["data"][s] for s in self.iter_slice())
[docs] def iter_elevation(self):
"""Return an iterator which returns sweep elevation data."""
return (self.elevation["data"][s] for s in self.iter_slice())
# get methods
[docs] def get_start(self, sweep):
"""Return the starting ray index for a given sweep."""
self._check_sweep_in_range(sweep)
return self.sweep_start_ray_index["data"][sweep]
[docs] def get_end(self, sweep):
"""Return the ending ray for a given sweep."""
self._check_sweep_in_range(sweep)
return self.sweep_end_ray_index["data"][sweep]
[docs] def get_start_end(self, sweep):
"""Return the starting and ending ray for a given sweep."""
return self.get_start(sweep), self.get_end(sweep)
[docs] def get_slice(self, sweep):
"""Return a slice for selecting rays for a given sweep."""
start, end = self.get_start_end(sweep)
return slice(start, end + 1)
[docs] def get_field(self, sweep, field_name, copy=False):
"""
Return the field data for a given sweep.
When used with :py:func:`get_gate_x_y_z` this method can be used to
obtain the data needed for plotting a radar field with the correct
spatial context.
Parameters
----------
sweep : int
Sweep number to retrieve data for, 0 based.
field_name : str
Name of the field from which data should be retrieved.
copy : bool, optional
True to return a copy of the data. False, the default, returns
a view of the data (when possible), changing this data will
change the data in the underlying Radar object.
Returns
-------
data : array
Array containing data for the requested sweep and field.
"""
self.check_field_exists(field_name)
s = self.get_slice(sweep)
data = self.fields[field_name]["data"][s]
if copy:
return data.copy()
else:
return data
[docs] def get_azimuth(self, sweep, copy=False):
"""
Return an array of azimuth angles for a given sweep.
Parameters
----------
sweep : int
Sweep number to retrieve data for, 0 based.
copy : bool, optional
True to return a copy of the azimuths. False, the default, returns
a view of the azimuths (when possible), changing this data will
change the data in the underlying Radar object.
Returns
-------
azimuths : array
Array containing the azimuth angles for a given sweep.
"""
s = self.get_slice(sweep)
azimuths = self.azimuth["data"][s]
if copy:
return azimuths.copy()
else:
return azimuths
[docs] def get_elevation(self, sweep, copy=False):
"""
Return an array of elevation angles for a given sweep.
Parameters
----------
sweep : int
Sweep number to retrieve data for, 0 based.
copy : bool, optional
True to return a copy of the elevations. False, the default,
returns a view of the elevations (when possible), changing this
data will change the data in the underlying Radar object.
Returns
-------
elevation : array
Array containing the elevation angles for a given sweep.
"""
s = self.get_slice(sweep)
elevation = self.elevation["data"][s]
if copy:
return elevation.copy()
else:
return elevation
[docs] def get_gate_x_y_z(self, sweep, edges=False, filter_transitions=False):
"""
Return the x, y and z gate locations in meters for a given sweep.
With the default parameter this method returns the same data as
contained in the gate_x, gate_y and gate_z attributes but this method
performs the gate location calculations only for the specified sweep
and therefore is more efficient than accessing this data through these
attribute.
When used with :py:func:`get_field` this method can be used to obtain
the data needed for plotting a radar field with the correct spatial
context.
Parameters
----------
sweep : int
Sweep number to retrieve gate locations from, 0 based.
edges : bool, optional
True to return the locations of the gate edges calculated by
interpolating between the range, azimuths and elevations.
False (the default) will return the locations of the gate centers
with no interpolation.
filter_transitions : bool, optional
True to remove rays where the antenna was in transition between
sweeps. False will include these rays. No rays will be removed
if the antenna_transition attribute is not available (set to None).
Returns
-------
x, y, z : 2D array
Array containing the x, y and z, distances from the radar in
meters for the center (or edges) for all gates in the sweep.
"""
azimuths = self.get_azimuth(sweep)
elevations = self.get_elevation(sweep)
if filter_transitions and self.antenna_transition is not None:
sweep_slice = self.get_slice(sweep)
valid = self.antenna_transition["data"][sweep_slice] == 0
azimuths = azimuths[valid]
elevations = elevations[valid]
return antenna_vectors_to_cartesian(
self.range["data"], azimuths, elevations, edges=edges
)
[docs] def get_gate_area(self, sweep):
"""
Return the area of each gate in a sweep. Units of area will be the
same as those of the range variable, squared.
Assumptions:
1. Azimuth data is in degrees.
Parameters
----------
sweep : int
Sweep number to retrieve gate locations from, 0 based.
Returns
-------
area : 2D array of size (ngates - 1, nrays - 1)
Array containing the area (in m * m) of each gate in the sweep.
"""
s = self.get_slice(sweep)
azimuths = self.azimuth["data"][s]
ranges = self.range["data"]
circular_area = np.pi * ranges**2
annular_area = np.diff(circular_area)
az_diffs = np.diff(azimuths)
az_diffs[az_diffs < 0.0] += 360
d_azimuths = az_diffs / 360.0 # Fraction of a full circle
dca, daz = np.meshgrid(annular_area, d_azimuths)
area = np.abs(dca * daz)
return area
[docs] def get_gate_lat_lon_alt(
self, sweep, reset_gate_coords=False, filter_transitions=False
):
"""
Return the longitude, latitude and altitude gate locations.
Longitude and latitude are in degrees and altitude in meters.
With the default parameter this method returns the same data as
contained in the gate_latitude, gate_longitude and gate_altitude
attributes but this method performs the gate location calculations
only for the specified sweep and therefore is more efficient than
accessing this data through these attribute. If coordinates have
at all, please use the reset_gate_coords parameter.
Parameters
----------
sweep : int
Sweep number to retrieve gate locations from, 0 based.
reset_gate_coords : bool, optional
Optional to reset the gate latitude, gate longitude and gate
altitude attributes before using them in this function. This
is useful when the geographic coordinates have changed and gate
latitude, gate longitude and gate altitude need to be reset.
filter_transitions : bool, optional
True to remove rays where the antenna was in transition between
sweeps. False will include these rays. No rays will be removed
if the antenna_transition attribute is not available (set to None).
Returns
-------
lat, lon, alt : 2D array
Array containing the latitude, longitude and altitude,
for all gates in the sweep.
"""
s = self.get_slice(sweep)
if reset_gate_coords:
gate_latitude = LazyLoadDict(get_metadata("gate_latitude"))
gate_latitude.set_lazy("data", _gate_lon_lat_data_factory(self, 1))
self.gate_latitude = gate_latitude
gate_longitude = LazyLoadDict(get_metadata("gate_longitude"))
gate_longitude.set_lazy("data", _gate_lon_lat_data_factory(self, 0))
self.gate_longitude = gate_longitude
gate_altitude = LazyLoadDict(get_metadata("gate_altitude"))
gate_altitude.set_lazy("data", _gate_altitude_data_factory(self))
self.gate_altitude = gate_altitude
lat = self.gate_latitude["data"][s]
lon = self.gate_longitude["data"][s]
alt = self.gate_altitude["data"][s]
if filter_transitions and self.antenna_transition is not None:
valid = self.antenna_transition["data"][s] == 0
lat = lat[valid]
lon = lon[valid]
alt = alt[valid]
return lat, lon, alt
[docs] def get_nyquist_vel(self, sweep, check_uniform=True):
"""
Return the Nyquist velocity in meters per second for a given sweep.
Raises a LookupError if the Nyquist velocity is not available, an
Exception is raised if the velocities are not uniform in the sweep
unless check_uniform is set to False.
Parameters
----------
sweep : int
Sweep number to retrieve data for, 0 based.
check_uniform : bool
True to check to perform a check on the Nyquist velocities that
they are uniform in the sweep, False will skip this check and
return the velocity of the first ray in the sweep.
Returns
-------
nyquist_velocity : float
Array containing the Nyquist velocity in m/s for a given sweep.
"""
s = self.get_slice(sweep)
try:
nyq_vel = self.instrument_parameters["nyquist_velocity"]["data"][s]
except TypeError:
raise LookupError("Nyquist velocity unavailable")
if check_uniform:
if np.any(nyq_vel != nyq_vel[0]):
raise Exception("Nyquist velocities are not uniform in sweep")
return float(nyq_vel[0])
# Methods
[docs] def info(self, level="standard", out=sys.stdout):
"""
Print information on radar.
Parameters
----------
level : {'compact', 'standard', 'full', 'c', 's', 'f'}, optional
Level of information on radar object to print, compact is
minimal information, standard more and full everything.
out : file-like, optional
Stream to direct output to, default is to print information
to standard out (the screen).
"""
if level == "c":
level = "compact"
elif level == "s":
level = "standard"
elif level == "f":
level = "full"
if level not in ["standard", "compact", "full"]:
raise ValueError("invalid level parameter")
self._dic_info("altitude", level, out)
self._dic_info("altitude_agl", level, out)
self._dic_info("antenna_transition", level, out)
self._dic_info("azimuth", level, out)
self._dic_info("elevation", level, out)
print("fields:", file=out)
for field_name, field_dic in self.fields.items():
self._dic_info(field_name, level, out, field_dic, 1)
self._dic_info("fixed_angle", level, out)
if self.instrument_parameters is None:
print("instrument_parameters: None", file=out)
else:
print("instrument_parameters:", file=out)
for name, dic in self.instrument_parameters.items():
self._dic_info(name, level, out, dic, 1)
self._dic_info("latitude", level, out)
self._dic_info("longitude", level, out)
print("nsweeps:", self.nsweeps, file=out)
print("ngates:", self.ngates, file=out)
print("nrays:", self.nrays, file=out)
if self.radar_calibration is None:
print("radar_calibration: None", file=out)
else:
print("radar_calibration:", file=out)
for name, dic in self.radar_calibration.items():
self._dic_info(name, level, out, dic, 1)
self._dic_info("range", level, out)
self._dic_info("scan_rate", level, out)
print("scan_type:", self.scan_type, file=out)
self._dic_info("sweep_end_ray_index", level, out)
self._dic_info("sweep_mode", level, out)
self._dic_info("sweep_number", level, out)
self._dic_info("sweep_start_ray_index", level, out)
self._dic_info("target_scan_rate", level, out)
self._dic_info("time", level, out)
# Airborne radar parameters
if self.rotation is not None:
self._dic_info("rotation", level, out)
if self.tilt is not None:
self._dic_info("tilt", level, out)
if self.roll is not None:
self._dic_info("roll", level, out)
if self.drift is not None:
self._dic_info("drift", level, out)
if self.heading is not None:
self._dic_info("heading", level, out)
if self.pitch is not None:
self._dic_info("pitch", level, out)
if self.georefs_applied is not None:
self._dic_info("georefs_applied", level, out)
# always print out all metadata last
self._dic_info("metadata", "full", out)
def _dic_info(self, attr, level, out, dic=None, ident_level=0):
"""Print information on a dictionary attribute."""
if dic is None:
dic = getattr(self, attr)
ilvl0 = "\t" * ident_level
ilvl1 = "\t" * (ident_level + 1)
if dic is None:
print(str(attr) + ": None", file=out)
return
# make a string summary of the data key if it exists.
if "data" not in dic:
d_str = "Missing"
elif not isinstance(dic["data"], np.ndarray):
d_str = "<not a ndarray>"
else:
data = dic["data"]
t = (data.dtype, data.shape)
d_str = "<ndarray of type: {} and shape: {}>".format(*t)
# compact, only data summary
if level == "compact":
print(ilvl0 + str(attr) + ":", d_str, file=out)
# standard, all keys, only summary for data
elif level == "standard":
print(ilvl0 + str(attr) + ":", file=out)
print(ilvl1 + "data:", d_str, file=out)
for key, val in dic.items():
if key == "data":
continue
print(ilvl1 + key + ":", val, file=out)
# full, all keys, full data
elif level == "full":
print(str(attr) + ":", file=out)
if "data" in dic:
print(ilvl1 + "data:", dic["data"], file=out)
for key, val in dic.items():
if key == "data":
continue
print(ilvl1 + key + ":", val, file=out)
return
[docs] def add_field(self, field_name, dic, replace_existing=False):
"""
Add a field to the object.
Parameters
----------
field_name : str
Name of the field to add to the dictionary of fields.
dic : dict
Dictionary contain field data and metadata.
replace_existing : bool, optional
True to replace the existing field with key field_name if it
exists, loosing any existing data. False will raise a ValueError
when the field already exists.
"""
# check that the field dictionary to add is valid
if field_name in self.fields and replace_existing is False:
err = f"A field with name: {field_name} already exists"
raise ValueError(err)
if "data" not in dic:
raise KeyError("dic must contain a 'data' key")
if dic["data"].shape != (self.nrays, self.ngates):
err = f"'data' has invalid shape, should be ({self.nrays}, {self.ngates})"
raise ValueError(err)
# add the field
self.fields[field_name] = dic
return
[docs] def add_filter(self, gatefilter, replace_existing=False, include_fields=None):
"""
Updates the radar object with an applied gatefilter provided
by the user that masks values in fields within the radar object.
Parameters
----------
gatefilter : GateFilter
GateFilter instance. This filter will exclude equal to
the conditions provided in the gatefilter and mask values
in fields specified or all fields if include_fields is None.
replace_existing : bool, optional
If True, replaces the fields in the radar object with
copies of those fields with the applied gatefilter.
False will return new fields with the appended 'filtered_'
prefix.
include_fields : list, optional
List of fields to have filtered applied to. If none, all
fields will have applied filter.
"""
# If include_fields is None, sets list to all fields to include.
if include_fields is None:
include_fields = [*self.fields.keys()]
try:
# Replace current fields with masked versions with applied gatefilter.
if replace_existing:
for field in include_fields:
self.fields[field]["data"] = np.ma.masked_where(
gatefilter.gate_excluded, self.fields[field]["data"]
)
# Add new fields with prefix 'filtered_'
else:
for field in include_fields:
field_dict = copy.deepcopy(self.fields[field])
field_dict["data"] = np.ma.masked_where(
gatefilter.gate_excluded, field_dict["data"]
)
self.add_field(
"filtered_" + field, field_dict, replace_existing=True
)
# If fields don't match up throw an error.
except KeyError:
raise KeyError(
field + " not found in the original radar object, "
"please check that names in the include_fields list "
"match those in the radar object."
)
return
[docs] def add_field_like(
self, existing_field_name, field_name, data, replace_existing=False
):
"""
Add a field to the object with metadata from a existing field.
Note that the data parameter is not copied by this method.
If data refers to a 'data' array from an existing field dictionary, a
copy should be made within or prior to using this method. If this is
not done the 'data' key in both field dictionaries will point to the
same NumPy array and modification of one will change the second. To
copy NumPy arrays use the copy() method. See the Examples section
for how to create a copy of the 'reflectivity' field as a field named
'reflectivity_copy'.
Parameters
----------
existing_field_name : str
Name of an existing field to take metadata from when adding
the new field to the object.
field_name : str
Name of the field to add to the dictionary of fields.
data : array
Field data. A copy of this data is not made, see the note above.
replace_existing : bool, optional
True to replace the existing field with key field_name if it
exists, loosing any existing data. False will raise a ValueError
when the field already exists.
Examples
--------
>>> radar.add_field_like('reflectivity', 'reflectivity_copy',
... radar.fields['reflectivity']['data'].copy())
"""
if existing_field_name not in self.fields:
err = f"field {existing_field_name} does not exist in object"
raise ValueError(err)
dic = {}
for k, v in self.fields[existing_field_name].items():
if k != "data":
dic[k] = v
dic["data"] = data
return self.add_field(field_name, dic, replace_existing=replace_existing)
def _rays_per_sweep_data_factory(radar):
"""Return a function which returns the number of rays per sweep."""
def _rays_per_sweep_data():
"""The function which returns the number of rays per sweep."""
return (
radar.sweep_end_ray_index["data"] - radar.sweep_start_ray_index["data"] + 1
)
return _rays_per_sweep_data
def _gate_data_factory(radar, coordinate):
"""Return a function which returns the Cartesian locations of gates."""
def _gate_data():
"""The function which returns the Cartesian locations of gates."""
ranges = radar.range["data"]
azimuths = radar.azimuth["data"]
elevations = radar.elevation["data"]
cartesian_coords = antenna_vectors_to_cartesian(
ranges, azimuths, elevations, edges=False
)
# load x, y, and z data except for the coordinate in question
if coordinate != 0:
radar.gate_x["data"] = cartesian_coords[0]
if coordinate != 1:
radar.gate_y["data"] = cartesian_coords[1]
if coordinate != 2:
radar.gate_z["data"] = cartesian_coords[2]
return cartesian_coords[coordinate]
return _gate_data
def _gate_lon_lat_data_factory(radar, coordinate):
"""Return a function which returns the geographic locations of gates."""
def _gate_lon_lat_data():
"""The function which returns the geographic locations gates."""
x = radar.gate_x["data"]
y = radar.gate_y["data"]
projparams = radar.projection.copy()
if projparams.pop("_include_lon_0_lat_0", False):
projparams["lon_0"] = radar.longitude["data"][0]
projparams["lat_0"] = radar.latitude["data"][0]
geographic_coords = cartesian_to_geographic(x, y, projparams)
# set the other geographic coordinate
if coordinate == 0:
radar.gate_latitude["data"] = geographic_coords[1]
else:
radar.gate_longitude["data"] = geographic_coords[0]
return geographic_coords[coordinate]
return _gate_lon_lat_data
def _gate_altitude_data_factory(radar):
"""Return a function which returns the gate altitudes."""
def _gate_altitude_data():
"""The function which returns the gate altitudes."""
try:
return radar.altitude["data"] + radar.gate_z["data"]
except ValueError:
return np.mean(radar.altitude["data"]) + radar.gate_z["data"]
return _gate_altitude_data