Source code for pyart.aux_io.edge_netcdf

"""
Utilities for reading EDGE NetCDF files.

"""

import datetime

import netCDF4
import numpy as np

from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from ..io.common import _test_arguments, make_time_unit_str


[docs]def read_edge_netcdf(filename, **kwargs): """ Read a EDGE NetCDF file. Parameters ---------- filename : str Name of EDGE NetCDF file to read data from. Returns ------- radar : Radar Radar object. """ # test for non empty kwargs _test_arguments(kwargs) # create metadata retrieval object filemetadata = FileMetadata("edge_netcdf") # Open netCDF4 file dset = netCDF4.Dataset(filename) nrays = len(dset.dimensions["Azimuth"]) nbins = len(dset.dimensions["Gate"]) # latitude, longitude and altitude latitude = filemetadata("latitude") longitude = filemetadata("longitude") altitude = filemetadata("altitude") latitude["data"] = np.array([dset.Latitude], "float64") longitude["data"] = np.array([dset.Longitude], "float64") altitude["data"] = np.array([dset.Height], "float64") # metadata metadata = filemetadata("metadata") metadata_mapping = { "vcp-value": "vcp", "radarName-value": "radar_name", "ConversionPlugin": "conversion_software", } for netcdf_attr, metadata_key in metadata_mapping.items(): if netcdf_attr in dset.ncattrs(): metadata[metadata_key] = dset.getncattr(netcdf_attr) # sweep_start_ray_index, sweep_end_ray_index sweep_start_ray_index = filemetadata("sweep_start_ray_index") sweep_end_ray_index = filemetadata("sweep_end_ray_index") sweep_start_ray_index["data"] = np.array([0], dtype="int32") sweep_end_ray_index["data"] = np.array([nrays - 1], dtype="int32") # sweep number sweep_number = filemetadata("sweep_number") sweep_number["data"] = np.array([0], dtype="int32") # sweep_type scan_type = "ppi" # sweep_mode, fixed_angle sweep_mode = filemetadata("sweep_mode") fixed_angle = filemetadata("fixed_angle") sweep_mode["data"] = np.array(1 * ["azimuth_surveillance"]) fixed_angle["data"] = np.array([dset.Elevation], dtype="float32") # time time = filemetadata("time") start_time = datetime.datetime.utcfromtimestamp(dset.Time) time["units"] = make_time_unit_str(start_time) time["data"] = np.zeros((nrays,), dtype="float64") # range _range = filemetadata("range") step = float(dset.getncattr("MaximumRange-value")) / nbins * 1000.0 _range["data"] = np.arange(nbins, dtype="float32") * step + step / 2 _range["meters_to_center_of_first_gate"] = step / 2.0 _range["meters_between_gates"] = step # elevation elevation = filemetadata("elevation") elevation_angle = dset.Elevation elevation["data"] = np.ones((nrays,), dtype="float32") * elevation_angle # azimuth azimuth = filemetadata("azimuth") azimuth["data"] = dset.variables["Azimuth"][:] # fields field_name = dset.TypeName field_data = np.ma.array(dset.variables[field_name][:]) if "MissingData" in dset.ncattrs(): field_data[field_data == dset.MissingData] = np.ma.masked if "RangeFolded" in dset.ncattrs(): field_data[field_data == dset.RangeFolded] = np.ma.masked fields = {field_name: filemetadata(field_name)} fields[field_name]["data"] = field_data fields[field_name]["units"] = dset.variables[field_name].Units fields[field_name]["_FillValue"] = get_fillvalue() # instrument_parameters instrument_parameters = {} if "PRF-value" in dset.ncattrs(): dic = filemetadata("prt") prt = 1.0 / float(dset.getncattr("PRF-value")) dic["data"] = np.ones((nrays,), dtype="float32") * prt instrument_parameters["prt"] = dic if "PulseWidth-value" in dset.ncattrs(): dic = filemetadata("pulse_width") pulse_width = dset.getncattr("PulseWidth-value") * 1.0e-6 dic["data"] = np.ones((nrays,), dtype="float32") * pulse_width instrument_parameters["pulse_width"] = dic if "NyquistVelocity-value" in dset.ncattrs(): dic = filemetadata("nyquist_velocity") nyquist_velocity = float(dset.getncattr("NyquistVelocity-value")) dic["data"] = np.ones((nrays,), dtype="float32") * nyquist_velocity instrument_parameters["nyquist_velocity"] = dic if "Beamwidth" in dset.variables: dic = filemetadata("radar_beam_width_h") dic["data"] = dset.variables["Beamwidth"][:] instrument_parameters["radar_beam_width_h"] = dic dset.close() return Radar( time, _range, fields, metadata, scan_type, latitude, longitude, altitude, sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index, sweep_end_ray_index, azimuth, elevation, instrument_parameters=instrument_parameters, )