"""
Routines for reading sinarame_H5 files.
"""
from datetime import datetime
import glob
import os
try:
from netcdftime import num2date
except ImportError:
from cftime import num2date
import numpy as np
try:
import h5py
_H5PY_AVAILABLE = True
except ImportError:
_H5PY_AVAILABLE = False
from ..config import FileMetadata, get_fillvalue
from ..io.common import make_time_unit_str, _test_arguments
from ..io import write_cfradial
from ..core.radar import Radar
from ..exceptions import MissingOptionalDependency
SINARAME_H5_FIELD_NAMES = {
'TH': 'total_power', # uncorrected reflectivity, horizontal
'TV': 'total_power_vertical', # uncorrected reflectivity, vertical
'DBZH': 'reflectivity', # corrected reflectivity, horizontal
'DBZV': 'reflectivity', # corrected reflectivity, vertical
'CM': 'clutter mask',
'ZDR': 'differential_reflectivity', # differential reflectivity
'RHOHV': 'cross_correlation_ratio',
'LDR': 'linear_polarization_ratio',
'PHIDP': 'differential_phase',
'KDP': 'specific_differential_phase',
'SQI': 'normalized_coherent_power',
'SNR': 'signal_to_noise_ratio',
'VRAD': 'velocity',
'WRAD': 'spectrum_width',
'QIND': 'quality_index',
}
[docs]def read_sinarame_h5(filename, field_names=None, additional_metadata=None,
file_field_names=False, exclude_fields=None,
include_fields=None, **kwargs):
"""
Read a SINARAME_H5 file.
Parameters
----------
filename : str
Name of the SINARAME_H5 file to read.
field_names : dict, optional
Dictionary mapping SINARAME_H5 field names to radar field names. If a
data type found in the file does not appear in this dictionary or has
a value of None it will not be placed in the radar.fields dictionary.
A value of None, the default, will use the mapping defined in the
Py-ART configuration file.
additional_metadata : dict of dicts, optional
Dictionary of dictionaries to retrieve metadata from during this read.
This metadata is not used during any successive file reads unless
explicitly included. A value of None, the default, will not
introduct any addition metadata and the file specific or default
metadata as specified by the Py-ART configuration file will be used.
file_field_names : bool, optional
True to use the MDV data type names for the field names. If this
case the field_names parameter is ignored. The field dictionary will
likely only have a 'data' key, unless the fields are defined in
`additional_metadata`.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields specified by include_fields.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields not specified by exclude_fields.
Returns
-------
radar : Radar
Radar object containing data from SINARAME_H5 file.
"""
# TODO before moving to pyart.io
# * unit test
# * add default field mapping, etc to default config
# * auto-detect file type with pyart.io.read function
# * instrument parameters
# * add additional checks for HOW attributes
# * support for other objects (SCAN, XSEC)
# check that h5py is available
if not _H5PY_AVAILABLE:
raise MissingOptionalDependency(
"h5py is required to use read_sinarame_h5 but is not installed")
# test for non empty kwargs
_test_arguments(kwargs)
# create metadata retrieval object
if field_names is None:
field_names = SINARAME_H5_FIELD_NAMES
filemetadata = FileMetadata('SINARAME_h5', field_names,
additional_metadata,
file_field_names, exclude_fields,
include_fields)
# open the file
hfile = h5py.File(filename, 'r')
SINARAME_object = _to_str(hfile['what'].attrs['object'])
if SINARAME_object not in ['PVOL', 'SCAN', 'ELEV', 'AZIM']:
raise NotImplementedError(
'object: %s not implemented.' % (SINARAME_object))
# determine the number of sweeps by the number of groups which
# begin with dataset
datasets = [k for k in hfile if k.startswith('dataset')]
datasets.sort(key=lambda x: int(x[7:]))
nsweeps = len(datasets)
# latitude, longitude and altitude
latitude = filemetadata('latitude')
longitude = filemetadata('longitude')
altitude = filemetadata('altitude')
h_where = hfile['where'].attrs
latitude['data'] = np.array([h_where['lat']], dtype='float64')
longitude['data'] = np.array([h_where['lon']], dtype='float64')
altitude['data'] = np.array([h_where['height']], dtype='float64')
# metadata
metadata = filemetadata('metadata')
metadata['source'] = _to_str(hfile['what'].attrs['source'])
metadata['original_container'] = 'SINARAME_h5'
metadata['SINARAME_conventions'] = _to_str(hfile.attrs['Conventions'])
h_what = hfile['what'].attrs
metadata['version'] = _to_str(h_what['version'])
metadata['source'] = _to_str(h_what['source'])
try:
ds1_how = hfile[datasets[0]]['how'].attrs
except KeyError:
# if no how group exists mock it with an empty dictionary
ds1_how = {}
if 'system' in ds1_how:
metadata['system'] = ds1_how['system']
if 'software' in ds1_how:
metadata['software'] = ds1_how['software']
if 'sw_version' in ds1_how:
metadata['sw_version'] = ds1_how['sw_version']
# sweep_start_ray_index, sweep_end_ray_index
sweep_start_ray_index = filemetadata('sweep_start_ray_index')
sweep_end_ray_index = filemetadata('sweep_end_ray_index')
if SINARAME_object in ['AZIM', 'SCAN', 'PVOL']:
rays_per_sweep = [
int(hfile[d]['where'].attrs['nrays']) for d in datasets]
elif SINARAME_object == 'ELEV':
rays_per_sweep = [
int(hfile[d]['where'].attrs['angles'].size) for d in datasets]
total_rays = sum(rays_per_sweep)
ssri = np.cumsum(np.append([0], rays_per_sweep[:-1])).astype('int32')
seri = np.cumsum(rays_per_sweep).astype('int32') - 1
sweep_start_ray_index['data'] = ssri
sweep_end_ray_index['data'] = seri
# sweep_number
sweep_number = filemetadata('sweep_number')
sweep_number['data'] = np.arange(nsweeps, dtype='int32')
# sweep_mode
sweep_mode = filemetadata('sweep_mode')
sweep_mode['data'] = np.array(nsweeps * ['azimuth_surveillance'])
# scan_type
if SINARAME_object == 'ELEV':
scan_type = 'rhi'
else:
scan_type = 'ppi'
# fixed_angle
fixed_angle = filemetadata('fixed_angle')
if SINARAME_object == 'ELEV':
sweep_el = [hfile[d]['where'].attrs['az_angle'] for d in datasets]
else:
sweep_el = [hfile[d]['where'].attrs['elangle'] for d in datasets]
fixed_angle['data'] = np.array(sweep_el, dtype='float32')
# elevation
elevation = filemetadata('elevation')
if 'elangles' in ds1_how:
edata = np.empty(total_rays, dtype='float32')
for d, start, stop in zip(datasets, ssri, seri):
edata[start:stop+1] = hfile[d]['how'].attrs['elangles'][:]
elevation['data'] = edata
elif SINARAME_object == 'ELEV':
edata = np.empty(total_rays, dtype='float32')
for d, start, stop in zip(datasets, ssri, seri):
edata[start:stop+1] = hfile[d]['where'].attrs['angles'][:]
elevation['data'] = edata
else:
elevation['data'] = np.repeat(sweep_el, rays_per_sweep)
# range
_range = filemetadata('range')
if 'rstart' in hfile['dataset1/where'].attrs:
# derive range from rstart and rscale attributes if available
# check that the gate spacing is constant between sweeps
rstart = [hfile[d]['where'].attrs['rstart'] for d in datasets]
if any(rstart != rstart[0]):
raise ValueError('range start changes between sweeps')
rscale = [hfile[d]['where'].attrs['rscale'] for d in datasets]
if any(rscale != rscale[0]):
raise ValueError('range scale changes between sweeps')
nbins = int(hfile['dataset1']['where'].attrs['nbins'])
_range['data'] = (np.arange(nbins, dtype='float32') * rscale[0] +
rstart[0])
_range['meters_to_center_of_first_gate'] = rstart[0]
_range['meters_between_gates'] = float(rscale[0])
else:
# if not defined use range attribute which defines the maximum range in
# km. There is no information on the starting location of the range
# bins so we assume this to be 0.
# This most often occurs in RHI files, which technically do not meet
# the ODIM 2.2 specs. Section 7.4 requires that these files include
# the where/rstart, where/rscale and where/nbins attributes.
max_range = [hfile[d]['where'].attrs['range'] for d in datasets]
if any(max_range != max_range[0]):
raise ValueError('maximum range changes between sweeps')
# nbins is required
nbins = hfile['dataset1/data1/data'].shape[1]
_range['data'] = np.linspace(
0, max_range[0] * 1000., nbins, dtype='float32')
_range['meters_to_center_of_first_gate'] = 0
_range['meters_between_gates'] = max_range[0] * 1000. / nbins
# azimuth
azimuth = filemetadata('azimuth')
az_data = np.ones((total_rays, ), dtype='float32')
for dset, start, stop in zip(datasets, ssri, seri):
if SINARAME_object == 'ELEV':
# all azimuth angles are the sweep azimuth angle
sweep_az = hfile[dset]['where'].attrs['az_angle']
elif SINARAME_object == 'AZIM':
# Sector azimuths are specified in the startaz and stopaz
# attribute of dataset/where.
# Assume that the azimuth angles do not pass through 0/360 degrees
startaz = hfile[dset]['where'].attrs['startaz']
stopaz = hfile[dset]['where'].attrs['stopaz']
nrays = stop - start + 1
sweep_az = np.linspace(startaz, stopaz, nrays, endpoint=True)
elif ('startazA' in ds1_how) and ('stopazA' in ds1_how):
# average between start and stop azimuth angles
startaz = hfile[dset]['how'].attrs['startazA']
stopaz = hfile[dset]['how'].attrs['stopazA']
sweep_az = np.angle(
(np.exp(1.j*np.deg2rad(startaz)) +
np.exp(1.j*np.deg2rad(stopaz))) / 2., deg=True)
else:
# according to section 5.1 the first ray points north (0 degrees)
# and proceeds clockwise for a complete 360 rotation.
nrays = stop - start + 1
sweep_az = np.linspace(0, 360, nrays, endpoint=False)
az_data[start:stop+1] = sweep_az
azimuth['data'] = az_data
# time
_time = filemetadata('time')
if ('startazT' in ds1_how) and ('stopazT' in ds1_how):
# average between startazT and stopazT
t_data = np.empty((total_rays, ), dtype='float32')
for dset, start, stop in zip(datasets, ssri, seri):
t_start = hfile[dset]['how'].attrs['startazT']
t_stop = hfile[dset]['how'].attrs['stopazT']
t_data[start:stop+1] = (t_start + t_stop) / 2
start_epoch = t_data.min()
start_time = datetime.utcfromtimestamp(start_epoch)
_time['units'] = make_time_unit_str(start_time)
_time['data'] = t_data - start_epoch
else:
t_data = np.empty((total_rays, ), dtype='int32')
# interpolate between each sweep starting and ending time
for dset, start, stop in zip(datasets, ssri, seri):
dset_what = hfile[dset]['what'].attrs
start_str = _to_str(
dset_what['startdate'] + dset_what['starttime'])
end_str = _to_str(dset_what['enddate'] + dset_what['endtime'])
start_dt = datetime.strptime(start_str, '%Y%m%d%H%M%S')
end_dt = datetime.strptime(end_str, '%Y%m%d%H%M%S')
time_delta = end_dt - start_dt
delta_seconds = time_delta.seconds + time_delta.days * 3600 * 24
rays = stop - start + 1
sweep_start_epoch = (
start_dt - datetime(1970, 1, 1)).total_seconds()
t_data[start:stop+1] = (sweep_start_epoch +
np.linspace(0, delta_seconds, rays))
start_epoch = t_data.min()
start_time = datetime.utcfromtimestamp(start_epoch)
_time['units'] = make_time_unit_str(start_time)
_time['data'] = (t_data - start_epoch).astype('float32')
# fields
fields = {}
h_field_keys = [k for k in hfile['dataset1'] if k.startswith('data')]
SINARAME_fields = [hfile['dataset1'][d]['what'].attrs['quantity'] for d in
h_field_keys]
for SINARAME_field, h_field_key in zip(SINARAME_fields, h_field_keys):
field_name = filemetadata.get_field_name(_to_str(SINARAME_field))
if field_name is None:
continue
fdata = np.ma.zeros((total_rays, nbins), dtype='float32')
start = 0
# loop over the sweeps, copy data into correct location in data array
for dset, rays_in_sweep in zip(datasets, rays_per_sweep):
sweep_data = _get_SINARAME_h5_sweep_data(hfile[dset][h_field_key])
sweep_nbins = sweep_data.shape[1]
fdata[start:start + rays_in_sweep, :sweep_nbins] = sweep_data[:]
start += rays_in_sweep
# create field dictionary
field_dic = filemetadata(field_name)
field_dic['data'] = fdata
field_dic['_FillValue'] = get_fillvalue()
fields[field_name] = field_dic
# Add missing metadata
if file_field_names:
fields[field_name].update(
filemetadata.get_metadata(field_names[field_name]))
# instrument_parameters
instrument_parameters = None
return Radar(
_time, _range, fields, metadata, scan_type,
latitude, longitude, altitude,
sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index,
sweep_end_ray_index,
azimuth, elevation,
instrument_parameters=instrument_parameters)
[docs]def write_sinarame_cfradial(path):
"""
This function takes SINARAME_H5 files (where every file has only one field
and one volume) from a folder and writes a CfRadial file for each volume
including all fields.
Parameters
----------
path : str
Where the SINARAME_H5 files are.
"""
path_user = os.path.expanduser(path)
TH_list = glob.glob(path_user + '/*_TH_*.H5')
file_date = [i.split('_')[-1][9:-4] for i in TH_list]
file_date.sort()
for i in file_date:
files = glob.glob(path_user + '/*' + i + 'Z.H5')
# I want it to start with TV or TH because of some range issue
files.sort(reverse=True)
files.sort(key=lambda x: len(x.split('_')[-2]))
for j in np.arange(len(files)):
basename = os.path.basename(files[j])
bs = basename.split('_')
base1 = '{b1}_{b2}_{b3}_{fn}_{b4}'.format(
b1=bs[0], b2=bs[1], b3=bs[2], fn=bs[3], b4=bs[4])
file = '{path}/{base1}'.format(path=path_user, base1=base1)
if j == 0:
try:
radar = read_sinarame_h5(file, file_field_names=True)
azi_shape, range_shape = radar.fields['TV']['data'].shape
except ValueError:
print("x - this Radar wasn't created",
base1, sep='\t')
# In case the H5 file was badly created with bbufr it
# throws a KeyError
except KeyError:
print("x - Wrong BUFR conversion", base1, sep='\t')
else:
try:
radar_tmp = read_sinarame_h5(file, file_field_names=True)
field = radar_tmp.fields.keys()[0]
# Add missing gates
if radar_tmp.fields[field]['data'].shape[1] != range_shape:
n_missing_gates = \
(range_shape
- radar_tmp.fields[field]['data'].shape[1])
fill = np.ma.masked_all((azi_shape, n_missing_gates))
data = np.ma.concatenate(
[radar_tmp.fields[field]['data'], fill], 1)
radar_tmp.fields[field]['data'] = data
radar.fields.update(radar_tmp.fields)
# Same as above, bbufr conversion errors
except KeyError:
print("x - Wrong BUFR conversion", base1, sep='\t')
except ValueError:
print("x - this Radar wasn't created",
base1, sep='\t')
# In case the first file didn't create a Radar object
except NameError:
print("Radar didn't exist, creating")
radar = read_sinarame_h5(file, file_field_names=True)
cal_temps = u"gregorian"
time1 = num2date(radar.time['data'][0], radar.time['units'],
calendar='standard', only_use_cftime_datetimes=True,
only_use_python_datetimes=False).strftime(
'%Y%m%d_%H%M%S')
time2 = num2date(radar.time['data'][-1], radar.time['units'],
calendar='standard', only_use_cftime_datetimes=True,
only_use_python_datetimes=False).strftime(
'%Y%m%d_%H%M%S')
radar._DeflateLevel = 5
# cfrad.TIME1_to_TIME2_NAME_VCP_RANGE.nc
cffile = 'cfrad.{time1}.0000_to_{time2}.0000'\
'_{b1}_{est}_{ran}'.format(time1=time1, time2=time2,
b1=bs[0], est=bs[1], ran=bs[2])
print('Writing to {path}{filename}.nc'.format(path=path_user,
filename=cffile))
write_cfradial(path_user + '/' + cffile + '.nc',
radar, format='NETCDF4_CLASSIC')
def _to_str(text):
""" Convert bytes to str if necessary. """
if hasattr(text, 'decode'):
return text.decode('utf-8')
else:
return text
def _get_SINARAME_h5_sweep_data(group):
""" Get SINARAME_H5 sweet data from an HDF5 group. """
# mask raw data
what = group['what']
raw_data = group['data'][:]
if 'nodata' in what.attrs:
nodata = what.attrs.get('nodata')
data = np.ma.masked_equal(raw_data, nodata)
else:
data = np.ma.masked_array(raw_data)
if 'undetect' in what.attrs:
undetect = what.attrs.get('undetect')
data[data == undetect] = np.ma.masked
offset = 0.0
gain = 1.0
if 'offset' in what.attrs:
offset = what.attrs.get('offset')
if 'gain' in what.attrs:
gain = what.attrs.get('gain')
return data * gain + offset