"""
Utilities for reading gamic hdf5 files.
"""
# TODO to move out of aux_io namespace:
# * unit tests - need small sample files.
# * auto-detect file type with pyart.io.read function
# * move to pyart.io namespace
import datetime
import warnings
import numpy as np
from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from ..io.common import _test_arguments, make_time_unit_str
try:
from .gamicfile import GAMICFile
_H5PY_AVAILABLE = True
except ImportError:
_H5PY_AVAILABLE = False
from ..exceptions import MissingOptionalDependency
LIGHT_SPEED = 2.99792458e8 # speed of light in meters per second
[docs]def read_gamic(
filename,
field_names=None,
additional_metadata=None,
file_field_names=False,
exclude_fields=None,
include_fields=None,
valid_range_from_file=True,
units_from_file=True,
pulse_width=None,
**kwargs
):
"""
Read a GAMIC hdf5 file.
Parameters
----------
filename : str
Name of GAMIC HDF5 file to read data from.
field_names : dict, optional
Dictionary mapping field names in the file names to radar field names.
Unlike other read functions, fields not in this dictionary or having a
value of None are still included in the radar.fields dictionary, to
exclude them use the `exclude_fields` parameter. Fields which are
mapped by this dictionary will be renamed from key to value.
additional_metadata : dict of dicts, optional
This parameter is not used, it is included for uniformity.
file_field_names : bool, optional
True to force the use of the field names from the file in which
case the `field_names` parameter is ignored. False will use to
`field_names` parameter to rename fields.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields specified by include_fields.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields not specified by exclude_fields.
valid_range_from_file : bool, optional
True to extract valid range (valid_min and valid_max) for all
field from the file when they are present. False will not extract
these parameters.
units_from_file : bool, optional
True to extract the units for all fields from the file when available.
False will not extract units using the default units for the fields.
pulse_width : list or None,
Mandatory for gamic radar processors which have pulsewidth enums.
pulse_width should contain the pulsewidth' in us.
Returns
-------
radar : Radar
Radar object.
"""
# check that h5py is available
if not _H5PY_AVAILABLE:
raise MissingOptionalDependency(
"h5py is required to use read_gamic but is not installed"
)
# test for non empty kwargs
_test_arguments(kwargs)
# create metadata retrieval object
filemetadata = FileMetadata(
"gamic",
field_names,
additional_metadata,
file_field_names,
exclude_fields,
include_fields,
)
# Open HDF5 file and get handle
gfile = GAMICFile(filename)
# verify that all scans are present in file
assert gfile.is_file_complete()
# latitude, longitude and altitude
latitude = filemetadata("latitude")
longitude = filemetadata("longitude")
altitude = filemetadata("altitude")
latitude["data"] = gfile.where_attr("lat", "float64")
longitude["data"] = gfile.where_attr("lon", "float64")
altitude["data"] = gfile.where_attr("height", "float64")
# metadata
metadata = filemetadata("metadata")
metadata["original_container"] = "GAMIC-HDF5"
what_mapping = {
"version": "gamic_version",
"sets_scheduled": "sets_scheduled",
"object": "gamic_object",
"date": "gamic_date",
}
for gamic_key, metadata_key in what_mapping.items():
if gfile.is_attr_in_group("what", gamic_key):
metadata[metadata_key] = gfile.raw_group_attr("what", gamic_key)
how_keys = [
"software",
"template_name",
"site_name",
"host_name",
"azimuth_beam",
"elevation_beam",
"sdp_name",
"sw_version",
"sdp_version",
"simulated",
]
for key in how_keys:
if gfile.is_attr_in_group("how", key):
metadata[key] = gfile.raw_group_attr("how", key)
# sweep_start_ray_index, sweep_end_ray_index
sweep_start_ray_index = filemetadata("sweep_start_ray_index")
sweep_end_ray_index = filemetadata("sweep_end_ray_index")
sweep_start_ray_index["data"] = gfile.start_ray.astype("int32")
sweep_end_ray_index["data"] = gfile.end_ray.astype("int32")
# sweep number
sweep_number = filemetadata("sweep_number")
try:
sweep_number["data"] = gfile.what_attrs("set_idx", "int32")
except KeyError:
sweep_number["data"] = np.arange(gfile.nsweeps, dtype="int32")
# sweep_type
scan_type = gfile.raw_scan0_group_attr("what", "scan_type").lower()
if hasattr(scan_type, "decode"):
scan_type = scan_type.decode("utf-8")
# check that all scans in the volume are the same type
if not gfile.is_file_single_scan_type():
raise NotImplementedError("Mixed scan_type volume.")
if scan_type not in ["ppi", "rhi"]:
message = "Unknown scan type: %s, reading as RHI scans." % (scan_type)
warnings.warn(message)
scan_type = "rhi"
# sweep_mode, fixed_angle
sweep_mode = filemetadata("sweep_mode")
fixed_angle = filemetadata("fixed_angle")
if scan_type == "rhi":
sweep_mode["data"] = np.array(gfile.nsweeps * ["rhi"])
fixed_angle["data"] = gfile.how_attrs("azimuth", "float32")
elif scan_type == "ppi":
sweep_mode["data"] = np.array(gfile.nsweeps * ["azimuth_surveillance"])
fixed_angle["data"] = gfile.how_attrs("elevation", "float32")
# time
time = filemetadata("time")
t_data = gfile.ray_header("timestamp", "int64")
start_epoch = t_data[0] // 1.0e6 # truncate to second resolution
start_time = datetime.datetime.utcfromtimestamp(start_epoch)
time["units"] = make_time_unit_str(start_time)
time["data"] = ((t_data - start_epoch * 1.0e6) / 1.0e6).astype("float64")
# range
_range = filemetadata("range")
ngates = gfile.max_num_gates
range_start = float(gfile.raw_scan0_group_attr("how", "range_start"))
# n_samples insertion
range_samples = int(gfile.raw_scan0_group_attr("how", "range_samples"))
range_step = float(gfile.raw_scan0_group_attr("how", "range_step")) * range_samples
# range_step may need to be scaled by range_samples
# XXX This gives distances to start of gates not center, this matches
# Radx but may be incorrect, add range_step / 2. for center
_range["data"] = np.arange(ngates, dtype="float32") * range_step + range_start
_range["meters_to_center_of_first_gate"] = range_start
_range["meters_between_gates"] = range_step
# elevation
elevation = filemetadata("elevation")
start_angle = gfile.ray_header("elevation_start", "float32")
stop_angle = gfile.ray_header("elevation_stop", "float32")
elevation["data"] = _avg_radial_angles(start_angle, stop_angle)
# azimuth
azimuth = filemetadata("azimuth")
start_angle = gfile.ray_header("azimuth_start", "float32")
stop_angle = gfile.ray_header("azimuth_stop", "float32")
azimuth["data"] = _avg_radial_angles(start_angle, stop_angle) % 360.0
# fields
fields = {}
moment_groups = gfile.moment_groups()
moment_names = gfile.moment_names(moment_groups)
for moment_name, group in zip(moment_names, moment_groups):
field_name = filemetadata.get_field_name(moment_name)
if field_name is None:
continue
field_dic = filemetadata(field_name)
field_dic["data"] = gfile.moment_data(group, "float32")
field_dic["_FillValue"] = get_fillvalue()
if valid_range_from_file:
try:
valid_min = gfile.raw_scan0_group_attr(group, "dyn_range_min")
valid_max = gfile.raw_scan0_group_attr(group, "dyn_range_max")
field_dic["valid_min"] = valid_min.decode("utf-8")
field_dic["valid_max"] = valid_max.decode("utf-8")
except:
pass
if units_from_file:
try:
units = gfile.raw_scan0_group_attr(group, "unit")
field_dic["units"] = units.decode("utf-8")
except:
pass
fields[field_name] = field_dic
# ray_angle_res
ray_angle_res = filemetadata("ray_angle_res")
ray_angle_res["data"] = gfile.how_attrs("angle_step", "float32")
# rays_are_indexed
rays_are_indexed = filemetadata("rays_are_indexed")
rays_are_indexed["data"] = np.array(
[["false", "true"][i] for i in gfile.how_attrs("angle_sync", "uint8")]
)
# target_scan_rate
target_scan_rate = filemetadata("target_scan_rate")
target_scan_rate["data"] = gfile.how_attrs("scan_speed", "float32")
# scan_rate
scan_rate = filemetadata("scan_rate")
scan_rate["data"] = target_scan_rate["data"]
if scan_type == "ppi":
azs_names = ["az_speed", "azimuth_speed"]
azs_name = azs_names[0]
for azs_name in azs_names:
if gfile.is_field_in_ray_header(azs_name):
scan_rate["data"] = gfile.ray_header(azs_name, "float32")
break
elif scan_type == "rhi":
els_names = ["el_speed", "elevation_speed"]
els_name = els_names[0]
for els_name in els_names:
if gfile.is_field_in_ray_header(els_name):
scan_rate["data"] = gfile.ray_header(els_name, "float32")
break
else:
scan_rate = None
# instrument_parameters
instrument_parameters = _get_instrument_params(gfile, filemetadata, pulse_width)
gfile.close()
return Radar(
time,
_range,
fields,
metadata,
scan_type,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
instrument_parameters=instrument_parameters,
ray_angle_res=ray_angle_res,
rays_are_indexed=rays_are_indexed,
scan_rate=scan_rate,
target_scan_rate=target_scan_rate,
)
def _get_instrument_params(gfile, filemetadata, pulse_width):
"""Return a dictionary containing instrument parameters."""
instrument_params = {}
dic = filemetadata("frequency")
dic["data"] = np.array(
[LIGHT_SPEED / gfile.raw_scan0_group_attr("how", "radar_wave_length")],
dtype="float32",
)
instrument_params["frequency"] = dic
dic = filemetadata("radar_beam_width_h")
dic["data"] = gfile.how_attr("azimuth_beam", "float32")
instrument_params["radar_beam_width_h"] = dic
dic = filemetadata("radar_beam_width_v")
dic["data"] = gfile.how_attr("elevation_beam", "float32")
instrument_params["radar_beam_width_v"] = dic
dic = filemetadata("pulse_width")
pw_names = ["pulse_width_us", "pulse_width_mks", "pulse_width"]
pw_name = "pulse_width_us"
dic["data"] = None
for pw_name in pw_names:
if gfile.is_attr_in_group("/scan0/how", pw_name):
if pw_name == "pulse_width":
dic["data"] = gfile.sweep_expand(
pulse_width[gfile.how_attrs(pw_name, "int")[0]] * 1e-6
)
else:
dic["data"] = gfile.sweep_expand(
gfile.how_attrs(pw_name, "float32") * 1e-6
)
break
instrument_params["pulse_width"] = dic
dic = filemetadata("prt")
dic["data"] = gfile.sweep_expand(1.0 / gfile.how_attrs("PRF", "float32"))
instrument_params["prt"] = dic
unfolding = gfile.how_attrs("unfolding", "int32")
dic = filemetadata("prt_mode")
dic["data"] = np.array([_prt_mode_from_unfolding(i) for i in unfolding])
instrument_params["prt_mode"] = dic
dic = filemetadata("prt_ratio")
dic["data"] = gfile.sweep_expand(
[[1, 2.0 / 3.0, 3.0 / 4.0, 4.0 / 5.0][i] for i in unfolding]
)
instrument_params["prt_ratio"] = dic
dic = filemetadata("unambiguous_range")
dic["data"] = gfile.sweep_expand(gfile.how_attrs("range", "float32"))
instrument_params["unambiguous_range"] = dic
if gfile.is_attr_in_group("/scan0/how/extended", "nyquist_velocity"):
dic = filemetadata("nyquist_velocity")
dic["data"] = gfile.sweep_expand(gfile.how_ext_attrs("nyquist_velocity"))
instrument_params["nyquist_velocity"] = dic
dic = filemetadata("n_samples")
dic["data"] = gfile.sweep_expand(
gfile.how_attrs("range_samples", "int32")
* gfile.how_attrs("time_samples", "int32"),
dtype="int32",
)
instrument_params["n_samples"] = dic
return instrument_params
def _avg_radial_angles(angle1, angle2):
"""Return the average angle between two radial angles."""
return np.angle(
(np.exp(1.0j * np.deg2rad(angle1)) + np.exp(1.0j * np.deg2rad(angle2))) / 2.0,
deg=True,
)
def _prt_mode_from_unfolding(unfolding):
"""Return 'fixed' or 'staggered' depending on unfolding flag"""
if unfolding == 0:
return "fixed"
else:
return "staggered"