"""
Reading of Universal format (UF) files.
"""
import warnings
import numpy as np
from netCDF4 import date2num
from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from .common import _test_arguments, make_time_unit_str, prepare_for_read
from .uffile import UFFile
_LIGHT_SPEED = 2.99792458e8 # speed of light in meters per second
_UF_SWEEP_MODES = {
0: "calibration",
1: "ppi",
2: "coplane",
3: "rhi",
4: "vpt",
5: "target",
6: "manual",
7: "idle",
8: "ppi", # RadX used this to indicate surveillance PPI scans
}
_SWEEP_MODE_STR = {
"calibration": "calibration",
"ppi": "azimuth_surveillance",
"coplane": "coplane",
"rhi": "rhi",
"vpt": "vertical_pointing",
"target": "pointing",
"manual": "manual",
"idle": "idle",
}
[docs]
def read_uf(
filename,
field_names=None,
additional_metadata=None,
file_field_names=False,
exclude_fields=None,
include_fields=None,
delay_field_loading=False,
**kwargs
):
"""
Read a UF File.
Parameters
----------
filename : str or file-like
Name of Universal format file to read data from.
field_names : dict, optional
Dictionary mapping UF data type names to radar field names. If a
data type found in the file does not appear in this dictionary or has
a value of None it will not be placed in the radar.fields dictionary.
A value of None, the default, will use the mapping defined in the
Py-ART configuration file.
additional_metadata : dict of dicts, optional
Dictionary of dictionaries to retrieve metadata from during this read.
This metadata is not used during any successive file reads unless
explicitly included. A value of None, the default, will not
introduce any addition metadata and the file specific or default
metadata as specified by the Py-ART configuration file will be used.
file_field_names : bool, optional
True to force the use of the field names from the file in which
case the `field_names` parameter is ignored. False will use to
`field_names` parameter to rename fields.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields specified by include_fields.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields not specified by exclude_fields.
delay_field_loading : bool
This option is not implemented in the function but included for
compatibility.
Returns
-------
radar : Radar
Radar object.
"""
# test for non empty kwargs
_test_arguments(kwargs)
# create metadata retrieval object
filemetadata = FileMetadata(
"uf",
field_names,
additional_metadata,
file_field_names,
exclude_fields,
include_fields,
)
# Open UF file and get handle
ufile = UFFile(prepare_for_read(filename))
first_ray = ufile.rays[0]
# time
dts = ufile.get_datetimes()
units = make_time_unit_str(min(dts))
time = filemetadata("time")
time["units"] = units
time["data"] = date2num(dts, units).astype("float32")
# range
_range = filemetadata("range")
# assume that the number of gates and spacing from the first ray is
# representative of the entire volume
field_header = first_ray.field_headers[0]
ngates = field_header["nbins"]
step = field_header["range_spacing_m"]
# this gives distances to the center of each gate, remove step/2 for start
start = (
field_header["range_start_km"] * 1000.0
+ field_header["range_start_m"]
+ step / 2.0
)
_range["data"] = np.arange(ngates, dtype="float32") * step + start
_range["meters_to_center_of_first_gate"] = start
_range["meters_between_gates"] = step
# latitude, longitude and altitude
latitude = filemetadata("latitude")
longitude = filemetadata("longitude")
altitude = filemetadata("altitude")
lat, lon, height = first_ray.get_location()
latitude["data"] = np.array([lat], dtype="float64")
longitude["data"] = np.array([lon], dtype="float64")
altitude["data"] = np.array([height], dtype="float64")
# metadata
metadata = filemetadata("metadata")
metadata["original_container"] = "UF"
metadata["site_name"] = first_ray.mandatory_header["site_name"]
metadata["radar_name"] = first_ray.mandatory_header["radar_name"]
# sweep_start_ray_index, sweep_end_ray_index
sweep_start_ray_index = filemetadata("sweep_start_ray_index")
sweep_end_ray_index = filemetadata("sweep_end_ray_index")
sweep_start_ray_index["data"] = ufile.first_ray_in_sweep
sweep_end_ray_index["data"] = ufile.last_ray_in_sweep
# sweep number
sweep_number = filemetadata("sweep_number")
sweep_number["data"] = np.arange(ufile.nsweeps, dtype="int32")
# scan_type
scan_type = _get_scan_type(first_ray)
# sweep_mode
sweep_mode = filemetadata("sweep_mode")
sweep_mode["data"] = np.array(
ufile.nsweeps * [_SWEEP_MODE_STR[scan_type]], dtype="S"
)
# elevation
elevation = filemetadata("elevation")
elevation["data"] = ufile.get_elevations()
# azimuth
azimuth = filemetadata("azimuth")
azimuth["data"] = ufile.get_azimuths()
# fixed_angle
fixed_angle = filemetadata("fixed_angle")
fixed_angle["data"] = ufile.get_sweep_fixed_angles()
# fields
fields = {}
for uf_field_number, uf_field_dic in enumerate(first_ray.field_positions):
uf_field_name = uf_field_dic["data_type"].decode("ascii")
field_name = filemetadata.get_field_name(uf_field_name)
if field_name is None:
continue
field_dic = filemetadata(field_name)
field_dic["data"] = ufile.get_field_data(uf_field_number)
field_dic["_FillValue"] = get_fillvalue()
fields[field_name] = field_dic
# instrument_parameters
instrument_parameters = _get_instrument_parameters(ufile, filemetadata)
# scan rate
scan_rate = filemetadata("scan_rate")
scan_rate["data"] = ufile.get_sweep_rates()
ufile.close()
return Radar(
time,
_range,
fields,
metadata,
scan_type,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
scan_rate=scan_rate,
instrument_parameters=instrument_parameters,
)
def _get_scan_type(ufray):
"""Ruturn the scan type of a UF ray."""
uf_sweep_mode = ufray.mandatory_header["sweep_mode"]
if uf_sweep_mode in _UF_SWEEP_MODES:
return _UF_SWEEP_MODES[uf_sweep_mode]
else:
warnings.warn("Unknown sweep mode, defaulting to ppi")
return "ppi"
def _get_instrument_parameters(ufile, filemetadata):
"""Return a dictionary containing instrument parameters."""
# pulse width
pulse_width = filemetadata("pulse_width")
pulse_width["data"] = ufile.get_pulse_widths() / _LIGHT_SPEED # m->sec
# assume that the parameters in the first ray represent the beam widths,
# bandwidth and frequency in the entire volume
first_ray = ufile.rays[0]
field_header = first_ray.field_headers[0]
beam_width_h = field_header["beam_width_h"] / 64.0
beam_width_v = field_header["beam_width_v"] / 64.0
bandwidth = field_header["bandwidth"] / 16.0 * 1.0e6
wavelength_cm = field_header["wavelength_cm"] / 64.0
if wavelength_cm == 0:
warnings.warn("Invalid wavelength, frequency set to default value.")
wavelength_hz = 9999.0
else:
wavelength_hz = _LIGHT_SPEED / (wavelength_cm / 100.0)
# radar_beam_width_h
radar_beam_width_h = filemetadata("radar_beam_width_h")
radar_beam_width_h["data"] = np.array([beam_width_h], dtype="float32")
# radar_beam_width_v
radar_beam_width_v = filemetadata("radar_beam_width_v")
radar_beam_width_v["data"] = np.array([beam_width_v], dtype="float32")
# radar_receiver_bandwidth
radar_receiver_bandwidth = filemetadata("radar_receiver_bandwidth")
radar_receiver_bandwidth["data"] = np.array([bandwidth], dtype="float32")
# polarization_mode
polarization_mode = filemetadata("polarization_mode")
polarization_mode["data"] = ufile.get_sweep_polarizations()
# frequency
frequency = filemetadata("frequency")
frequency["data"] = np.array([wavelength_hz], dtype="float32")
# prt
prt = filemetadata("prt")
prt["data"] = ufile.get_prts() / 1e6 # us->sec
instrument_parameters = {
"pulse_width": pulse_width,
"radar_beam_width_h": radar_beam_width_h,
"radar_beam_width_v": radar_beam_width_v,
"radar_receiver_bandwidth": radar_receiver_bandwidth,
"polarization_mode": polarization_mode,
"frequency": frequency,
"prt": prt,
}
# nyquist velocity if defined
nyquist_velocity = filemetadata("nyquist_velocity")
nyquist_velocity["data"] = ufile.get_nyquists()
if nyquist_velocity["data"] is not None:
instrument_parameters["nyquist_velocity"] = nyquist_velocity
return instrument_parameters