"""
Routines for reading sinarame_H5 files.
"""
import glob
import os
from datetime import datetime
try:
from netcdftime import num2date
except ImportError:
from cftime import num2date
import numpy as np
try:
import h5py
_H5PY_AVAILABLE = True
except ImportError:
_H5PY_AVAILABLE = False
from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from ..exceptions import MissingOptionalDependency
from ..io import write_cfradial
from ..io.common import _test_arguments, make_time_unit_str
SINARAME_H5_FIELD_NAMES = {
"TH": "total_power", # uncorrected reflectivity, horizontal
"TV": "total_power_vertical", # uncorrected reflectivity, vertical
"DBZH": "reflectivity", # corrected reflectivity, horizontal
"DBZV": "reflectivity", # corrected reflectivity, vertical
"CM": "clutter mask",
"ZDR": "differential_reflectivity", # differential reflectivity
"RHOHV": "cross_correlation_ratio",
"LDR": "linear_polarization_ratio",
"PHIDP": "differential_phase",
"KDP": "specific_differential_phase",
"SQI": "normalized_coherent_power",
"SNR": "signal_to_noise_ratio",
"VRAD": "velocity",
"WRAD": "spectrum_width",
"QIND": "quality_index",
}
[docs]def read_sinarame_h5(
filename,
field_names=None,
additional_metadata=None,
file_field_names=False,
exclude_fields=None,
include_fields=None,
**kwargs,
):
"""
Read a SINARAME_H5 file.
Parameters
----------
filename : str
Name of the SINARAME_H5 file to read.
field_names : dict, optional
Dictionary mapping SINARAME_H5 field names to radar field names. If a
data type found in the file does not appear in this dictionary or has
a value of None it will not be placed in the radar.fields dictionary.
A value of None, the default, will use the mapping defined in the
Py-ART configuration file.
additional_metadata : dict of dicts, optional
Dictionary of dictionaries to retrieve metadata from during this read.
This metadata is not used during any successive file reads unless
explicitly included. A value of None, the default, will not
introduct any addition metadata and the file specific or default
metadata as specified by the Py-ART configuration file will be used.
file_field_names : bool, optional
True to use the MDV data type names for the field names. If this
case the field_names parameter is ignored. The field dictionary will
likely only have a 'data' key, unless the fields are defined in
`additional_metadata`.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields specified by include_fields.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields not specified by exclude_fields.
Returns
-------
radar : Radar
Radar object containing data from SINARAME_H5 file.
"""
# TODO before moving to pyart.io
# * unit test
# * add default field mapping, etc to default config
# * auto-detect file type with pyart.io.read function
# * instrument parameters
# * add additional checks for HOW attributes
# * support for other objects (SCAN, XSEC)
# check that h5py is available
if not _H5PY_AVAILABLE:
raise MissingOptionalDependency(
"h5py is required to use read_sinarame_h5 but is not installed"
)
# test for non empty kwargs
_test_arguments(kwargs)
# create metadata retrieval object
if field_names is None:
field_names = SINARAME_H5_FIELD_NAMES
filemetadata = FileMetadata(
"SINARAME_h5",
field_names,
additional_metadata,
file_field_names,
exclude_fields,
include_fields,
)
# open the file
hfile = h5py.File(filename, "r")
SINARAME_object = _to_str(hfile["what"].attrs["object"])
if SINARAME_object not in ["PVOL", "SCAN", "ELEV", "AZIM"]:
raise NotImplementedError(f"object: {SINARAME_object} not implemented.")
# determine the number of sweeps by the number of groups which
# begin with dataset
datasets = [k for k in hfile if k.startswith("dataset")]
datasets.sort(key=lambda x: int(x[7:]))
nsweeps = len(datasets)
# latitude, longitude and altitude
latitude = filemetadata("latitude")
longitude = filemetadata("longitude")
altitude = filemetadata("altitude")
h_where = hfile["where"].attrs
latitude["data"] = np.array([h_where["lat"]], dtype="float64")
longitude["data"] = np.array([h_where["lon"]], dtype="float64")
altitude["data"] = np.array([h_where["height"]], dtype="float64")
# metadata
metadata = filemetadata("metadata")
metadata["source"] = _to_str(hfile["what"].attrs["source"])
metadata["original_container"] = "SINARAME_h5"
metadata["SINARAME_conventions"] = _to_str(hfile.attrs["Conventions"])
h_what = hfile["what"].attrs
metadata["version"] = _to_str(h_what["version"])
metadata["source"] = _to_str(h_what["source"])
try:
ds1_how = hfile[datasets[0]]["how"].attrs
except KeyError:
# if no how group exists mock it with an empty dictionary
ds1_how = {}
if "system" in ds1_how:
metadata["system"] = ds1_how["system"]
if "software" in ds1_how:
metadata["software"] = ds1_how["software"]
if "sw_version" in ds1_how:
metadata["sw_version"] = ds1_how["sw_version"]
# sweep_start_ray_index, sweep_end_ray_index
sweep_start_ray_index = filemetadata("sweep_start_ray_index")
sweep_end_ray_index = filemetadata("sweep_end_ray_index")
if SINARAME_object in ["AZIM", "SCAN", "PVOL"]:
rays_per_sweep = [int(hfile[d]["where"].attrs["nrays"]) for d in datasets]
elif SINARAME_object == "ELEV":
rays_per_sweep = [int(hfile[d]["where"].attrs["angles"].size) for d in datasets]
total_rays = sum(rays_per_sweep)
ssri = np.cumsum(np.append([0], rays_per_sweep[:-1])).astype("int32")
seri = np.cumsum(rays_per_sweep).astype("int32") - 1
sweep_start_ray_index["data"] = ssri
sweep_end_ray_index["data"] = seri
# sweep_number
sweep_number = filemetadata("sweep_number")
sweep_number["data"] = np.arange(nsweeps, dtype="int32")
# sweep_mode
sweep_mode = filemetadata("sweep_mode")
sweep_mode["data"] = np.array(nsweeps * ["azimuth_surveillance"])
# scan_type
if SINARAME_object == "ELEV":
scan_type = "rhi"
else:
scan_type = "ppi"
# fixed_angle
fixed_angle = filemetadata("fixed_angle")
if SINARAME_object == "ELEV":
sweep_el = [hfile[d]["where"].attrs["az_angle"] for d in datasets]
else:
sweep_el = [hfile[d]["where"].attrs["elangle"] for d in datasets]
fixed_angle["data"] = np.array(sweep_el, dtype="float32")
# elevation
elevation = filemetadata("elevation")
if "elangles" in ds1_how:
edata = np.empty(total_rays, dtype="float32")
for d, start, stop in zip(datasets, ssri, seri):
edata[start : stop + 1] = hfile[d]["how"].attrs["elangles"][:]
elevation["data"] = edata
elif SINARAME_object == "ELEV":
edata = np.empty(total_rays, dtype="float32")
for d, start, stop in zip(datasets, ssri, seri):
edata[start : stop + 1] = hfile[d]["where"].attrs["angles"][:]
elevation["data"] = edata
else:
elevation["data"] = np.repeat(sweep_el, rays_per_sweep)
# range
_range = filemetadata("range")
if "rstart" in hfile["dataset1/where"].attrs:
# derive range from rstart and rscale attributes if available
# check that the gate spacing is constant between sweeps
rstart = [hfile[d]["where"].attrs["rstart"] for d in datasets]
if any(rstart != rstart[0]):
raise ValueError("range start changes between sweeps")
rscale = [hfile[d]["where"].attrs["rscale"] for d in datasets]
if any(rscale != rscale[0]):
raise ValueError("range scale changes between sweeps")
nbins = int(hfile["dataset1"]["where"].attrs["nbins"])
_range["data"] = np.arange(nbins, dtype="float32") * rscale[0] + rstart[0]
_range["meters_to_center_of_first_gate"] = rstart[0]
_range["meters_between_gates"] = float(rscale[0])
else:
# if not defined use range attribute which defines the maximum range in
# km. There is no information on the starting location of the range
# bins so we assume this to be 0.
# This most often occurs in RHI files, which technically do not meet
# the ODIM 2.2 specs. Section 7.4 requires that these files include
# the where/rstart, where/rscale and where/nbins attributes.
max_range = [hfile[d]["where"].attrs["range"] for d in datasets]
if any(max_range != max_range[0]):
raise ValueError("maximum range changes between sweeps")
# nbins is required
nbins = hfile["dataset1/data1/data"].shape[1]
_range["data"] = np.linspace(0, max_range[0] * 1000.0, nbins, dtype="float32")
_range["meters_to_center_of_first_gate"] = 0
_range["meters_between_gates"] = max_range[0] * 1000.0 / nbins
# azimuth
azimuth = filemetadata("azimuth")
az_data = np.ones((total_rays,), dtype="float32")
for dset, start, stop in zip(datasets, ssri, seri):
if SINARAME_object == "ELEV":
# all azimuth angles are the sweep azimuth angle
sweep_az = hfile[dset]["where"].attrs["az_angle"]
elif SINARAME_object == "AZIM":
# Sector azimuths are specified in the startaz and stopaz
# attribute of dataset/where.
# Assume that the azimuth angles do not pass through 0/360 degrees
startaz = hfile[dset]["where"].attrs["startaz"]
stopaz = hfile[dset]["where"].attrs["stopaz"]
nrays = stop - start + 1
sweep_az = np.linspace(startaz, stopaz, nrays, endpoint=True)
elif ("startazA" in ds1_how) and ("stopazA" in ds1_how):
# average between start and stop azimuth angles
startaz = hfile[dset]["how"].attrs["startazA"]
stopaz = hfile[dset]["how"].attrs["stopazA"]
sweep_az = np.angle(
(np.exp(1.0j * np.deg2rad(startaz)) + np.exp(1.0j * np.deg2rad(stopaz)))
/ 2.0,
deg=True,
)
else:
# according to section 5.1 the first ray points north (0 degrees)
# and proceeds clockwise for a complete 360 rotation.
nrays = stop - start + 1
sweep_az = np.linspace(0, 360, nrays, endpoint=False)
az_data[start : stop + 1] = sweep_az
azimuth["data"] = az_data
# time
_time = filemetadata("time")
if ("startazT" in ds1_how) and ("stopazT" in ds1_how):
# average between startazT and stopazT
t_data = np.empty((total_rays,), dtype="float32")
for dset, start, stop in zip(datasets, ssri, seri):
t_start = hfile[dset]["how"].attrs["startazT"]
t_stop = hfile[dset]["how"].attrs["stopazT"]
t_data[start : stop + 1] = (t_start + t_stop) / 2
start_epoch = t_data.min()
start_time = datetime.utcfromtimestamp(start_epoch)
_time["units"] = make_time_unit_str(start_time)
_time["data"] = t_data - start_epoch
else:
t_data = np.empty((total_rays,), dtype="int32")
# interpolate between each sweep starting and ending time
for dset, start, stop in zip(datasets, ssri, seri):
dset_what = hfile[dset]["what"].attrs
start_str = _to_str(dset_what["startdate"] + dset_what["starttime"])
end_str = _to_str(dset_what["enddate"] + dset_what["endtime"])
start_dt = datetime.strptime(start_str, "%Y%m%d%H%M%S")
end_dt = datetime.strptime(end_str, "%Y%m%d%H%M%S")
time_delta = end_dt - start_dt
delta_seconds = time_delta.seconds + time_delta.days * 3600 * 24
rays = stop - start + 1
sweep_start_epoch = (start_dt - datetime(1970, 1, 1)).total_seconds()
t_data[start : stop + 1] = sweep_start_epoch + np.linspace(
0, delta_seconds, rays
)
start_epoch = t_data.min()
start_time = datetime.utcfromtimestamp(start_epoch)
_time["units"] = make_time_unit_str(start_time)
_time["data"] = (t_data - start_epoch).astype("float32")
# fields
fields = {}
h_field_keys = [k for k in hfile["dataset1"] if k.startswith("data")]
SINARAME_fields = [
hfile["dataset1"][d]["what"].attrs["quantity"] for d in h_field_keys
]
for SINARAME_field, h_field_key in zip(SINARAME_fields, h_field_keys):
field_name = filemetadata.get_field_name(_to_str(SINARAME_field))
if field_name is None:
continue
fdata = np.ma.zeros((total_rays, nbins), dtype="float32")
start = 0
# loop over the sweeps, copy data into correct location in data array
for dset, rays_in_sweep in zip(datasets, rays_per_sweep):
sweep_data = _get_SINARAME_h5_sweep_data(hfile[dset][h_field_key])
sweep_nbins = sweep_data.shape[1]
fdata[start : start + rays_in_sweep, :sweep_nbins] = sweep_data[:]
start += rays_in_sweep
# create field dictionary
field_dic = filemetadata(field_name)
field_dic["data"] = fdata
field_dic["_FillValue"] = get_fillvalue()
fields[field_name] = field_dic
# Add missing metadata
if file_field_names:
fields[field_name].update(
filemetadata.get_metadata(field_names[field_name])
)
# instrument_parameters
# Check if no attributes for instrument parameters.
if len(hfile["how"].attrs) == 0:
instrument_parameters = None
# Grab each instrument parameter and its value for the hfile object.
else:
instrument_parameters = {}
for i in hfile["how"].attrs.keys():
instrument_parameters[i] = hfile["how"].attrs[i]
return Radar(
_time,
_range,
fields,
metadata,
scan_type,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
instrument_parameters=instrument_parameters,
)
[docs]def write_sinarame_cfradial(path):
"""
This function takes SINARAME_H5 files (where every file has only one field
and one volume) from a folder and writes a CfRadial file for each volume
including all fields.
Parameters
----------
path : str
Where the SINARAME_H5 files are.
"""
path_user = os.path.expanduser(path)
TH_list = glob.glob(path_user + "/*_TH_*.H5")
file_date = [i.split("_")[-1][9:-4] for i in TH_list]
file_date.sort()
for i in file_date:
files = glob.glob(path_user + "/*" + i + "Z.H5")
# I want it to start with TV or TH because of some range issue
files.sort(reverse=True)
files.sort(key=lambda x: len(x.split("_")[-2]))
for j in np.arange(len(files)):
basename = os.path.basename(files[j])
bs = basename.split("_")
base1 = f"{bs[0]}_{bs[1]}_{bs[2]}_{bs[3]}_{bs[4]}"
file = f"{path_user}/{base1}"
if j == 0:
try:
radar = read_sinarame_h5(file, file_field_names=True)
azi_shape, range_shape = radar.fields["TV"]["data"].shape
except ValueError:
print("x - this Radar wasn't created", base1, sep="\t")
# In case the H5 file was badly created with bbufr it
# throws a KeyError
except KeyError:
print("x - Wrong BUFR conversion", base1, sep="\t")
else:
try:
radar_tmp = read_sinarame_h5(file, file_field_names=True)
field = radar_tmp.fields.keys()[0]
# Add missing gates
if radar_tmp.fields[field]["data"].shape[1] != range_shape:
n_missing_gates = (
range_shape - radar_tmp.fields[field]["data"].shape[1]
)
fill = np.ma.masked_all((azi_shape, n_missing_gates))
data = np.ma.concatenate(
[radar_tmp.fields[field]["data"], fill], 1
)
radar_tmp.fields[field]["data"] = data
radar.fields.update(radar_tmp.fields)
# Same as above, bbufr conversion errors
except KeyError:
print("x - Wrong BUFR conversion", base1, sep="\t")
except ValueError:
print("x - this Radar wasn't created", base1, sep="\t")
# In case the first file didn't create a Radar object
except NameError:
print("Radar didn't exist, creating")
radar = read_sinarame_h5(file, file_field_names=True)
time1 = num2date(
radar.time["data"][0],
radar.time["units"],
calendar="standard",
only_use_cftime_datetimes=True,
only_use_python_datetimes=False,
).strftime("%Y%m%d_%H%M%S")
time2 = num2date(
radar.time["data"][-1],
radar.time["units"],
calendar="standard",
only_use_cftime_datetimes=True,
only_use_python_datetimes=False,
).strftime("%Y%m%d_%H%M%S")
radar._DeflateLevel = 5
# cfrad.TIME1_to_TIME2_NAME_VCP_RANGE.nc
cffile = f"cfrad.{time1}.0000_to_{time2}.0000" f"_{bs[0]}_{bs[1]}_{bs[2]}"
print(f"Writing to {path_user}{cffile}.nc")
write_cfradial(
path_user + "/" + cffile + ".nc", radar, format="NETCDF4_CLASSIC"
)
def _to_str(text):
"""Convert bytes to str if necessary."""
if hasattr(text, "decode"):
return text.decode("utf-8")
else:
return text
def _get_SINARAME_h5_sweep_data(group):
"""Get SINARAME_H5 sweet data from an HDF5 group."""
# mask raw data
what = group["what"]
raw_data = group["data"][:]
if "nodata" in what.attrs:
nodata = what.attrs.get("nodata")
data = np.ma.masked_equal(raw_data, nodata)
else:
data = np.ma.masked_array(raw_data)
if "undetect" in what.attrs:
undetect = what.attrs.get("undetect")
data[data == undetect] = np.ma.masked
offset = 0.0
gain = 1.0
if "offset" in what.attrs:
offset = what.attrs.get("offset")
if "gain" in what.attrs:
gain = what.attrs.get("gain")
return data * gain + offset