Source code for pyart.io.uf

"""
Reading of Universal format (UF) files.

"""

import warnings

import numpy as np
from netCDF4 import date2num

from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from .common import _test_arguments, make_time_unit_str, prepare_for_read
from .uffile import UFFile

_LIGHT_SPEED = 2.99792458e8  # speed of light in meters per second
_UF_SWEEP_MODES = {
    0: "calibration",
    1: "ppi",
    2: "coplane",
    3: "rhi",
    4: "vpt",
    5: "target",
    6: "manual",
    7: "idle",
    8: "ppi",  # RadX used this to indicate surveillance PPI scans
}

_SWEEP_MODE_STR = {
    "calibration": "calibration",
    "ppi": "azimuth_surveillance",
    "coplane": "coplane",
    "rhi": "rhi",
    "vpt": "vertical_pointing",
    "target": "pointing",
    "manual": "manual",
    "idle": "idle",
}


[docs]def read_uf( filename, field_names=None, additional_metadata=None, file_field_names=False, exclude_fields=None, include_fields=None, delay_field_loading=False, **kwargs ): """ Read a UF File. Parameters ---------- filename : str or file-like Name of Universal format file to read data from. field_names : dict, optional Dictionary mapping UF data type names to radar field names. If a data type found in the file does not appear in this dictionary or has a value of None it will not be placed in the radar.fields dictionary. A value of None, the default, will use the mapping defined in the Py-ART configuration file. additional_metadata : dict of dicts, optional Dictionary of dictionaries to retrieve metadata from during this read. This metadata is not used during any successive file reads unless explicitly included. A value of None, the default, will not introduce any addition metadata and the file specific or default metadata as specified by the Py-ART configuration file will be used. file_field_names : bool, optional True to force the use of the field names from the file in which case the `field_names` parameter is ignored. False will use to `field_names` parameter to rename fields. exclude_fields : list or None, optional List of fields to exclude from the radar object. This is applied after the `file_field_names` and `field_names` parameters. Set to None to include all fields specified by include_fields. include_fields : list or None, optional List of fields to include from the radar object. This is applied after the `file_field_names` and `field_names` parameters. Set to None to include all fields not specified by exclude_fields. delay_field_loading : bool This option is not implemented in the function but included for compatibility. Returns ------- radar : Radar Radar object. """ # test for non empty kwargs _test_arguments(kwargs) # create metadata retrieval object filemetadata = FileMetadata( "uf", field_names, additional_metadata, file_field_names, exclude_fields, include_fields, ) # Open UF file and get handle ufile = UFFile(prepare_for_read(filename)) first_ray = ufile.rays[0] # time dts = ufile.get_datetimes() units = make_time_unit_str(min(dts)) time = filemetadata("time") time["units"] = units time["data"] = date2num(dts, units).astype("float32") # range _range = filemetadata("range") # assume that the number of gates and spacing from the first ray is # representative of the entire volume field_header = first_ray.field_headers[0] ngates = field_header["nbins"] step = field_header["range_spacing_m"] # this gives distances to the center of each gate, remove step/2 for start start = ( field_header["range_start_km"] * 1000.0 + field_header["range_start_m"] + step / 2.0 ) _range["data"] = np.arange(ngates, dtype="float32") * step + start _range["meters_to_center_of_first_gate"] = start _range["meters_between_gates"] = step # latitude, longitude and altitude latitude = filemetadata("latitude") longitude = filemetadata("longitude") altitude = filemetadata("altitude") lat, lon, height = first_ray.get_location() latitude["data"] = np.array([lat], dtype="float64") longitude["data"] = np.array([lon], dtype="float64") altitude["data"] = np.array([height], dtype="float64") # metadata metadata = filemetadata("metadata") metadata["original_container"] = "UF" metadata["site_name"] = first_ray.mandatory_header["site_name"] metadata["radar_name"] = first_ray.mandatory_header["radar_name"] # sweep_start_ray_index, sweep_end_ray_index sweep_start_ray_index = filemetadata("sweep_start_ray_index") sweep_end_ray_index = filemetadata("sweep_end_ray_index") sweep_start_ray_index["data"] = ufile.first_ray_in_sweep sweep_end_ray_index["data"] = ufile.last_ray_in_sweep # sweep number sweep_number = filemetadata("sweep_number") sweep_number["data"] = np.arange(ufile.nsweeps, dtype="int32") # scan_type scan_type = _get_scan_type(first_ray) # sweep_mode sweep_mode = filemetadata("sweep_mode") sweep_mode["data"] = np.array( ufile.nsweeps * [_SWEEP_MODE_STR[scan_type]], dtype="S" ) # elevation elevation = filemetadata("elevation") elevation["data"] = ufile.get_elevations() # azimuth azimuth = filemetadata("azimuth") azimuth["data"] = ufile.get_azimuths() # fixed_angle fixed_angle = filemetadata("fixed_angle") fixed_angle["data"] = ufile.get_sweep_fixed_angles() # fields fields = {} for uf_field_number, uf_field_dic in enumerate(first_ray.field_positions): uf_field_name = uf_field_dic["data_type"].decode("ascii") field_name = filemetadata.get_field_name(uf_field_name) if field_name is None: continue field_dic = filemetadata(field_name) field_dic["data"] = ufile.get_field_data(uf_field_number) field_dic["_FillValue"] = get_fillvalue() fields[field_name] = field_dic # instrument_parameters instrument_parameters = _get_instrument_parameters(ufile, filemetadata) # scan rate scan_rate = filemetadata("scan_rate") scan_rate["data"] = ufile.get_sweep_rates() ufile.close() return Radar( time, _range, fields, metadata, scan_type, latitude, longitude, altitude, sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index, sweep_end_ray_index, azimuth, elevation, scan_rate=scan_rate, instrument_parameters=instrument_parameters, )
def _get_scan_type(ufray): """Ruturn the scan type of a UF ray.""" uf_sweep_mode = ufray.mandatory_header["sweep_mode"] if uf_sweep_mode in _UF_SWEEP_MODES: return _UF_SWEEP_MODES[uf_sweep_mode] else: warnings.warn("Unknown sweep mode, defaulting to ppi") return "ppi" def _get_instrument_parameters(ufile, filemetadata): """Return a dictionary containing instrument parameters.""" # pulse width pulse_width = filemetadata("pulse_width") pulse_width["data"] = ufile.get_pulse_widths() / _LIGHT_SPEED # m->sec # assume that the parameters in the first ray represent the beam widths, # bandwidth and frequency in the entire volume first_ray = ufile.rays[0] field_header = first_ray.field_headers[0] beam_width_h = field_header["beam_width_h"] / 64.0 beam_width_v = field_header["beam_width_v"] / 64.0 bandwidth = field_header["bandwidth"] / 16.0 * 1.0e6 wavelength_cm = field_header["wavelength_cm"] / 64.0 if wavelength_cm == 0: warnings.warn("Invalid wavelength, frequency set to default value.") wavelength_hz = 9999.0 else: wavelength_hz = _LIGHT_SPEED / (wavelength_cm / 100.0) # radar_beam_width_h radar_beam_width_h = filemetadata("radar_beam_width_h") radar_beam_width_h["data"] = np.array([beam_width_h], dtype="float32") # radar_beam_width_v radar_beam_width_v = filemetadata("radar_beam_width_v") radar_beam_width_v["data"] = np.array([beam_width_v], dtype="float32") # radar_receiver_bandwidth radar_receiver_bandwidth = filemetadata("radar_receiver_bandwidth") radar_receiver_bandwidth["data"] = np.array([bandwidth], dtype="float32") # polarization_mode polarization_mode = filemetadata("polarization_mode") polarization_mode["data"] = ufile.get_sweep_polarizations() # frequency frequency = filemetadata("frequency") frequency["data"] = np.array([wavelength_hz], dtype="float32") # prt prt = filemetadata("prt") prt["data"] = ufile.get_prts() / 1e6 # us->sec instrument_parameters = { "pulse_width": pulse_width, "radar_beam_width_h": radar_beam_width_h, "radar_beam_width_v": radar_beam_width_v, "radar_receiver_bandwidth": radar_receiver_bandwidth, "polarization_mode": polarization_mode, "frequency": frequency, "prt": prt, } # nyquist velocity if defined nyquist_velocity = filemetadata("nyquist_velocity") nyquist_velocity["data"] = ufile.get_nyquists() if nyquist_velocity["data"] is not None: instrument_parameters["nyquist_velocity"] = nyquist_velocity return instrument_parameters