Source code for

Reading of Universal format (UF) files.


import warnings

import numpy as np
from netCDF4 import date2num

from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from .common import make_time_unit_str, _test_arguments, prepare_for_read
from .uffile import UFFile

_LIGHT_SPEED = 2.99792458e8  # speed of light in meters per second
    0: 'calibration',
    1: 'ppi',
    2: 'coplane',
    3: 'rhi',
    4: 'vpt',
    5: 'target',
    6: 'manual',
    7: 'idle',
    8: 'ppi',   # RadX used this to indicate surveillance PPI scans

    'calibration': 'calibration',
    'ppi': 'azimuth_surveillance',
    'coplane': 'coplane',
    'rhi': 'rhi',
    'vpt': 'vertical_pointing',
    'target': 'pointing',
    'manual': 'manual',
    'idle': 'idle',

[docs]def read_uf(filename, field_names=None, additional_metadata=None, file_field_names=False, exclude_fields=None, include_fields=None, delay_field_loading=False, **kwargs): """ Read a UF File. Parameters ---------- filename : str or file-like Name of Universal format file to read data from. field_names : dict, optional Dictionary mapping UF data type names to radar field names. If a data type found in the file does not appear in this dictionary or has a value of None it will not be placed in the radar.fields dictionary. A value of None, the default, will use the mapping defined in the Py-ART configuration file. additional_metadata : dict of dicts, optional Dictionary of dictionaries to retrieve metadata from during this read. This metadata is not used during any successive file reads unless explicitly included. A value of None, the default, will not introduce any addition metadata and the file specific or default metadata as specified by the Py-ART configuration file will be used. file_field_names : bool, optional True to force the use of the field names from the file in which case the `field_names` parameter is ignored. False will use to `field_names` parameter to rename fields. exclude_fields : list or None, optional List of fields to exclude from the radar object. This is applied after the `file_field_names` and `field_names` parameters. Set to None to include all fields specified by include_fields. include_fields : list or None, optional List of fields to include from the radar object. This is applied after the `file_field_names` and `field_names` parameters. Set to None to include all fields not specified by exclude_fields. delay_field_loading : bool This option is not implemented in the function but included for compatibility. Returns ------- radar : Radar Radar object. """ # test for non empty kwargs _test_arguments(kwargs) # create metadata retrieval object filemetadata = FileMetadata('uf', field_names, additional_metadata, file_field_names, exclude_fields, include_fields) # Open UF file and get handle ufile = UFFile(prepare_for_read(filename)) first_ray = ufile.rays[0] # time dts = ufile.get_datetimes() units = make_time_unit_str(min(dts)) time = filemetadata('time') time['units'] = units time['data'] = date2num(dts, units).astype('float32') # range _range = filemetadata('range') # assume that the number of gates and spacing from the first ray is # representative of the entire volume field_header = first_ray.field_headers[0] ngates = field_header['nbins'] step = field_header['range_spacing_m'] # this gives distances to the center of each gate, remove step/2 for start start = (field_header['range_start_km'] * 1000. + field_header['range_start_m'] + step / 2.) _range['data'] = np.arange(ngates, dtype='float32') * step + start _range['meters_to_center_of_first_gate'] = start _range['meters_between_gates'] = step # latitude, longitude and altitude latitude = filemetadata('latitude') longitude = filemetadata('longitude') altitude = filemetadata('altitude') lat, lon, height = first_ray.get_location() latitude['data'] = np.array([lat], dtype='float64') longitude['data'] = np.array([lon], dtype='float64') altitude['data'] = np.array([height], dtype='float64') # metadata metadata = filemetadata('metadata') metadata['original_container'] = 'UF' metadata['site_name'] = first_ray.mandatory_header['site_name'] metadata['radar_name'] = first_ray.mandatory_header['radar_name'] # sweep_start_ray_index, sweep_end_ray_index sweep_start_ray_index = filemetadata('sweep_start_ray_index') sweep_end_ray_index = filemetadata('sweep_end_ray_index') sweep_start_ray_index['data'] = ufile.first_ray_in_sweep sweep_end_ray_index['data'] = ufile.last_ray_in_sweep # sweep number sweep_number = filemetadata('sweep_number') sweep_number['data'] = np.arange(ufile.nsweeps, dtype='int32') # scan_type scan_type = _get_scan_type(first_ray) # sweep_mode sweep_mode = filemetadata('sweep_mode') sweep_mode['data'] = np.array( ufile.nsweeps * [_SWEEP_MODE_STR[scan_type]], dtype='S') # elevation elevation = filemetadata('elevation') elevation['data'] = ufile.get_elevations() # azimuth azimuth = filemetadata('azimuth') azimuth['data'] = ufile.get_azimuths() # fixed_angle fixed_angle = filemetadata('fixed_angle') fixed_angle['data'] = ufile.get_sweep_fixed_angles() # fields fields = {} for uf_field_number, uf_field_dic in enumerate(first_ray.field_positions): uf_field_name = uf_field_dic['data_type'].decode('ascii') field_name = filemetadata.get_field_name(uf_field_name) if field_name is None: continue field_dic = filemetadata(field_name) field_dic['data'] = ufile.get_field_data(uf_field_number) field_dic['_FillValue'] = get_fillvalue() fields[field_name] = field_dic # instrument_parameters instrument_parameters = _get_instrument_parameters(ufile, filemetadata) # scan rate scan_rate = filemetadata('scan_rate') scan_rate['data'] = ufile.get_sweep_rates() ufile.close() return Radar( time, _range, fields, metadata, scan_type, latitude, longitude, altitude, sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index, sweep_end_ray_index, azimuth, elevation, scan_rate=scan_rate, instrument_parameters=instrument_parameters)
def _get_scan_type(ufray): """ Ruturn the scan type of a UF ray. """ uf_sweep_mode = ufray.mandatory_header['sweep_mode'] if uf_sweep_mode in _UF_SWEEP_MODES: return _UF_SWEEP_MODES[uf_sweep_mode] else: warnings.warn('Unknown sweep mode, defaulting to ppi') return 'ppi' def _get_instrument_parameters(ufile, filemetadata): """ Return a dictionary containing instrument parameters. """ # pulse width pulse_width = filemetadata('pulse_width') pulse_width['data'] = ufile.get_pulse_widths() / _LIGHT_SPEED # m->sec # assume that the parameters in the first ray represent the beam widths, # bandwidth and frequency in the entire volume first_ray = ufile.rays[0] field_header = first_ray.field_headers[0] beam_width_h = field_header['beam_width_h'] / 64. beam_width_v = field_header['beam_width_v'] / 64. bandwidth = field_header['bandwidth'] / 16. * 1.e6 wavelength_cm = field_header['wavelength_cm'] / 64. if wavelength_cm == 0: warnings.warn('Invalid wavelength, frequency set to default value.') wavelength_hz = 9999.0 else: wavelength_hz = _LIGHT_SPEED / (wavelength_cm / 100.) # radar_beam_width_h radar_beam_width_h = filemetadata('radar_beam_width_h') radar_beam_width_h['data'] = np.array([beam_width_h], dtype='float32') # radar_beam_width_v radar_beam_width_v = filemetadata('radar_beam_width_v') radar_beam_width_v['data'] = np.array([beam_width_v], dtype='float32') # radar_receiver_bandwidth radar_receiver_bandwidth = filemetadata('radar_receiver_bandwidth') radar_receiver_bandwidth['data'] = np.array([bandwidth], dtype='float32') # polarization_mode polarization_mode = filemetadata('polarization_mode') polarization_mode['data'] = ufile.get_sweep_polarizations() # frequency frequency = filemetadata('frequency') frequency['data'] = np.array([wavelength_hz], dtype='float32') # prt prt = filemetadata('prt') prt['data'] = ufile.get_prts() / 1e6 # us->sec instrument_parameters = { 'pulse_width': pulse_width, 'radar_beam_width_h': radar_beam_width_h, 'radar_beam_width_v': radar_beam_width_v, 'radar_receiver_bandwidth': radar_receiver_bandwidth, 'polarization_mode': polarization_mode, 'frequency': frequency, 'prt': prt, } # nyquist velocity if defined nyquist_velocity = filemetadata('nyquist_velocity') nyquist_velocity['data'] = ufile.get_nyquists() if nyquist_velocity['data'] is not None: instrument_parameters['nyquist_velocity'] = nyquist_velocity return instrument_parameters