"""
Routines for reading ODIM_H5 files.
"""
import datetime
import numpy as np
try:
import h5py
_H5PY_AVAILABLE = True
except ImportError:
_H5PY_AVAILABLE = False
from ..config import FileMetadata, get_fillvalue
from ..io.common import make_time_unit_str, _test_arguments
from ..core.radar import Radar
from ..exceptions import MissingOptionalDependency
ODIM_H5_FIELD_NAMES = {
'TH': 'total_power_horizontal', # uncorrected reflectivity, horizontal
'TV': 'total_power_vertical', # uncorrected reflectivity, vertical
'DBZH': 'reflectivity_horizontal', # corrected reflectivity, horizontal
'DBZV': 'reflectivity_vertical', # corrected reflectivity, vertical
'ZDR': 'differential_reflectivity', # differential reflectivity
'RHOHV': 'cross_correlation_ratio',
'LDR': 'linear_polarization_ratio',
'PHIDP': 'differential_phase',
'KDP': 'specific_differential_phase',
'SQI': 'normalized_coherent_power',
'SNR': 'signal_to_noise_ratio',
'VRAD': 'velocity', # radial velocity, marked for deprecation in ODIM HDF5 2.2
'VRADH': 'velocity_horizontal', # radial velocity, horizontal polarisation
'VRADV': 'velocity_vertical', # radial velocity, vertical polarisation
'WRAD': 'spectrum_width',
'QIND': 'quality_index',
}
[docs]def read_odim_h5(filename, field_names=None, additional_metadata=None,
file_field_names=False, exclude_fields=None,
include_fields=None, include_datasets=None,
exclude_datasets=None, **kwargs):
"""
Read a ODIM_H5 file.
Parameters
----------
filename : str
Name of the ODIM_H5 file to read.
field_names : dict, optional
Dictionary mapping ODIM_H5 field names to radar field names. If a
data type found in the file does not appear in this dictionary or has
a value of None it will not be placed in the radar.fields dictionary.
A value of None, the default, will use the mapping defined in the
Py-ART configuration file.
additional_metadata : dict of dicts, optional
Dictionary of dictionaries to retrieve metadata from during this read.
This metadata is not used during any successive file reads unless
explicitly included. A value of None, the default, will not
introduct any addition metadata and the file specific or default
metadata as specified by the Py-ART configuration file will be used.
file_field_names : bool, optional
True to use the MDV data type names for the field names. If this
case the field_names parameter is ignored. The field dictionary will
likely only have a 'data' key, unless the fields are defined in
`additional_metadata`.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields specified by include_fields.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields not specified by exclude_fields.
include_datasets : list or None, optional
List of datasets to include from the HDF5 file, given
as ["dataset1", "dataset2", ...]. Set to None to include all datasets
not specified by exclude_datasets.
exclude_datasets : list or None, optional
List of datasets to exclude from the HDF5 file, given
as ["dataset1", "dataset2", ...]. Set to None to include all datasets
specified by include_datasets.
Returns
-------
radar : Radar
Radar object containing data from ODIM_H5 file.
"""
# TODO before moving to pyart.io
# * unit test
# * add default field mapping, etc to default config
# * auto-detect file type with pyart.io.read function
# * instrument parameters
# * add additional checks for HOW attributes
# * support for other objects (SCAN, XSEC)
# check that h5py is available
if not _H5PY_AVAILABLE:
raise MissingOptionalDependency(
"h5py is required to use read_odim_h5 but is not installed")
# test for non empty kwargs
_test_arguments(kwargs)
# create metadata retrieval object
if field_names is None:
field_names = ODIM_H5_FIELD_NAMES
filemetadata = FileMetadata('odim_h5', field_names, additional_metadata,
file_field_names, exclude_fields,
include_fields)
# open the file
with h5py.File(filename, 'r') as hfile:
odim_object = _to_str(hfile['what'].attrs['object'])
if odim_object not in ['PVOL', 'SCAN', 'ELEV', 'AZIM']:
raise NotImplementedError(
'object: %s not implemented.' % (odim_object))
# determine the number of sweeps by the number of groups which
# begin with dataset
if include_datasets is not None:
datasets = [k for k in hfile if k.startswith('dataset')
and k in include_datasets]
elif exclude_datasets is not None:
datasets = [k for k in hfile if k.startswith('dataset')
and k not in exclude_datasets]
else:
datasets = [k for k in hfile if k.startswith('dataset')]
datasets.sort(key=lambda x: int(x[7:]))
nsweeps = len(datasets)
# latitude, longitude and altitude
latitude = filemetadata('latitude')
longitude = filemetadata('longitude')
altitude = filemetadata('altitude')
h_where = hfile['where'].attrs
latitude['data'] = np.array([h_where['lat']], dtype='float64')
longitude['data'] = np.array([h_where['lon']], dtype='float64')
altitude['data'] = np.array([h_where['height']], dtype='float64')
# metadata
metadata = filemetadata('metadata')
metadata['source'] = _to_str(hfile['what'].attrs['source'])
metadata['original_container'] = 'odim_h5'
metadata['odim_conventions'] = _to_str(hfile.attrs['Conventions'])
h_what = hfile['what'].attrs
metadata['version'] = _to_str(h_what['version'])
metadata['source'] = _to_str(h_what['source'])
try:
ds1_how = hfile[datasets[0]]['how'].attrs
except KeyError:
# if no how group exists mock it with an empty dictionary
ds1_how = {}
if 'system' in ds1_how:
metadata['system'] = ds1_how['system']
if 'software' in ds1_how:
metadata['software'] = ds1_how['software']
if 'sw_version' in ds1_how:
metadata['sw_version'] = ds1_how['sw_version']
# sweep_start_ray_index, sweep_end_ray_index
sweep_start_ray_index = filemetadata('sweep_start_ray_index')
sweep_end_ray_index = filemetadata('sweep_end_ray_index')
if odim_object in ['AZIM', 'SCAN', 'PVOL']:
rays_per_sweep = [
int(hfile[d]['where'].attrs['nrays']) for d in datasets]
elif odim_object == 'ELEV':
rays_per_sweep = [
int(hfile[d]['where'].attrs['angles'].size) for d in datasets]
total_rays = sum(rays_per_sweep)
ssri = np.cumsum(np.append([0], rays_per_sweep[:-1])).astype('int32')
seri = np.cumsum(rays_per_sweep).astype('int32') - 1
sweep_start_ray_index['data'] = ssri
sweep_end_ray_index['data'] = seri
# sweep_number
sweep_number = filemetadata('sweep_number')
sweep_number['data'] = np.arange(nsweeps, dtype='int32')
# sweep_mode
sweep_mode = filemetadata('sweep_mode')
sweep_mode['data'] = np.array(nsweeps * ['azimuth_surveillance'])
# scan_type
if odim_object == 'ELEV':
scan_type = 'rhi'
else:
scan_type = 'ppi'
# fixed_angle
fixed_angle = filemetadata('fixed_angle')
if odim_object == 'ELEV':
sweep_el = [hfile[d]['where'].attrs['az_angle'] for d in datasets]
else:
sweep_el = [hfile[d]['where'].attrs['elangle'] for d in datasets]
fixed_angle['data'] = np.array(sweep_el, dtype='float32')
# elevation
elevation = filemetadata('elevation')
if 'elangles' in ds1_how:
edata = np.empty(total_rays, dtype='float32')
for d, start, stop in zip(datasets, ssri, seri):
edata[start:stop+1] = hfile[d]['how'].attrs['elangles'][:]
elevation['data'] = edata
elif odim_object == 'ELEV':
edata = np.empty(total_rays, dtype='float32')
for d, start, stop in zip(datasets, ssri, seri):
edata[start:stop+1] = hfile[d]['where'].attrs['angles'][:]
elevation['data'] = edata
else:
elevation['data'] = np.repeat(sweep_el, rays_per_sweep)
# range
_range = filemetadata('range')
if 'rstart' in hfile['dataset1/where'].attrs:
# derive range from rstart and rscale attributes if available
# check that the gate spacing is constant between sweeps
rstart = [hfile[d]['where'].attrs['rstart'] for d in datasets]
if any(rstart != rstart[0]):
raise ValueError('range start changes between sweeps')
rscale = [hfile[d]['where'].attrs['rscale'] for d in datasets]
if any(rscale != rscale[0]):
raise ValueError('range scale changes between sweeps')
all_sweeps_nbins = [hfile[d]['where'].attrs['nbins'] for d in datasets]
# check for max range off all sweeps
max_nbins = max(all_sweeps_nbins)
if isinstance(max_nbins, np.ndarray):
max_nbins = max_nbins[0]
else:
max_nbins = max(all_sweeps_nbins)
rscenter = 1e3 * rstart[0] + rscale[0] / 2
_range['data'] = np.arange(rscenter,
rscenter + max_nbins * rscale[0],
rscale[0], dtype='float32')
_range['meters_to_center_of_first_gate'] = rstart[0] * 1000.
_range['meters_between_gates'] = float(rscale[0])
else:
# if not defined use range attribute which defines the maximum range
# in km. There is no information on the starting location of the
# range bins so we assume this to be 0.
# This most often occurs in RHI files, which technically do not meet
# the ODIM 2.2 specs. Section 7.4 requires that these files include
# the where/rstart, where/rscale and where/nbins attributes.
max_range = [hfile[d]['where'].attrs['range'] for d in datasets]
if any(max_range != max_range[0]):
raise ValueError('maximum range changes between sweeps')
# nbins is required
nbins = hfile['dataset1/data1/data'].shape[1]
_range['data'] = np.linspace(
0, max_range[0] * 1000., nbins).astype('float32')
_range['meters_to_center_of_first_gate'] = 0
_range['meters_between_gates'] = max_range[0] * 1000. / nbins
# azimuth
azimuth = filemetadata('azimuth')
az_data = np.ones((total_rays, ), dtype='float32')
for dset, start, stop in zip(datasets, ssri, seri):
if odim_object == 'ELEV':
# all azimuth angles are the sweep azimuth angle
sweep_az = hfile[dset]['where'].attrs['az_angle']
elif odim_object == 'AZIM':
# Sector azimuths are specified in the startaz and stopaz
# attribute of dataset/where.
# Assume that the azimuth angles do not pass through 0/360 deg.
startaz = hfile[dset]['where'].attrs['startaz']
stopaz = hfile[dset]['where'].attrs['stopaz']
nrays = stop - start + 1
sweep_az = np.linspace(startaz, stopaz, nrays, endpoint=True)
elif ('startazA' in ds1_how) and ('stopazA' in ds1_how):
# average between start and stop azimuth angles
startaz = hfile[dset]['how'].attrs['startazA']
stopaz = hfile[dset]['how'].attrs['stopazA']
sweep_az = np.angle(
(np.exp(1.j*np.deg2rad(startaz)) +
np.exp(1.j*np.deg2rad(stopaz))) / 2., deg=True)
else:
# according to section 5.1 the first ray points to the
# northernmost direction and proceeds clockwise for a complete
# 360 rotation.
try:
astart = hfile[dset]['how'].attrs['astart']
except KeyError:
astart = 0.0
nrays = hfile[dset]['where'].attrs['nrays']
da = 360.0 / nrays
sweep_az = np.arange(astart + da / 2., 360., da, dtype='float32')
az_data[start:stop+1] = sweep_az
azimuth['data'] = az_data
# time
_time = filemetadata('time')
if ('startazT' in ds1_how) and ('stopazT' in ds1_how):
# average between startazT and stopazT
t_data = np.empty((total_rays, ), dtype='float32')
for dset, start, stop in zip(datasets, ssri, seri):
t_start = hfile[dset]['how'].attrs['startazT']
t_stop = hfile[dset]['how'].attrs['stopazT']
t_data[start:stop+1] = (t_start + t_stop) / 2
start_epoch = t_data.min()
start_time = datetime.datetime.utcfromtimestamp(start_epoch)
_time['units'] = make_time_unit_str(start_time)
_time['data'] = t_data - start_epoch
else:
t_data = np.empty((total_rays, ), dtype='int32')
# interpolate between each sweep starting and ending time
for dset, start, stop in zip(datasets, ssri, seri):
dset_what = hfile[dset]['what'].attrs
start_str = _to_str(
dset_what['startdate'] + dset_what['starttime'])
end_str = _to_str(dset_what['enddate'] + dset_what['endtime'])
start_dt = datetime.datetime.strptime(start_str, '%Y%m%d%H%M%S')
end_dt = datetime.datetime.strptime(end_str, '%Y%m%d%H%M%S')
time_delta = end_dt - start_dt
delta_seconds = time_delta.seconds + time_delta.days * 3600 * 24
rays = stop - start + 1
sweep_start_epoch = (
start_dt - datetime.datetime(1970, 1, 1)).total_seconds()
t_data[start:stop+1] = (sweep_start_epoch +
np.linspace(0, delta_seconds, rays))
start_epoch = t_data.min()
start_time = datetime.datetime.utcfromtimestamp(start_epoch)
_time['units'] = make_time_unit_str(start_time)
_time['data'] = (t_data - start_epoch).astype('float32')
# fields
fields = {}
h_field_keys = [k for k in hfile['dataset1'] if k.startswith('data')]
odim_fields = [hfile['dataset1'][d]['what'].attrs['quantity'] for d in
h_field_keys]
for odim_field, h_field_key in zip(odim_fields, h_field_keys):
field_name = filemetadata.get_field_name(_to_str(odim_field))
if field_name is None:
continue
fdata = np.ma.zeros((total_rays, max_nbins), dtype='float32')
start = 0
# loop on the sweeps, copy data into correct location in data array
for dset, rays_in_sweep in zip(datasets, rays_per_sweep):
try:
sweep_data = _get_odim_h5_sweep_data(hfile[dset][h_field_key])
except KeyError:
sweep_data = np.zeros((rays_in_sweep, max_nbins)) + np.NaN
sweep_nbins = sweep_data.shape[1]
fdata[start:start + rays_in_sweep, :sweep_nbins] = sweep_data[:]
# set data to NaN if its beyond the range of this sweep
fdata[start:start + rays_in_sweep,
sweep_nbins:max_nbins] = np.nan
start += rays_in_sweep
# create field dictionary
field_dic = filemetadata(field_name)
field_dic['data'] = fdata
field_dic['_FillValue'] = get_fillvalue()
fields[field_name] = field_dic
# instrument_parameters
instrument_parameters = None
return Radar(
_time, _range, fields, metadata, scan_type,
latitude, longitude, altitude,
sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index,
sweep_end_ray_index,
azimuth, elevation,
instrument_parameters=instrument_parameters)
def _to_str(text):
""" Convert bytes to str if necessary. """
if hasattr(text, 'decode'):
return text.decode('utf-8')
else:
return text
def _get_odim_h5_sweep_data(group):
""" Get ODIM_H5 sweet data from an HDF5 group. """
# mask raw data
what = group['what']
raw_data = group['data'][:]
if 'nodata' in what.attrs:
nodata = what.attrs.get('nodata')
data = np.ma.masked_equal(raw_data, nodata)
else:
data = np.ma.masked_array(raw_data)
if 'undetect' in what.attrs:
undetect = what.attrs.get('undetect')
data[data == undetect] = np.ma.masked
offset = 0.0
gain = 1.0
if 'offset' in what.attrs:
offset = what.attrs.get('offset')
if 'gain' in what.attrs:
gain = what.attrs.get('gain')
return data * gain + offset