"""
Routines for reading RAINBOW files (Used by SELEX) using the wradlib library
"""
# specific modules for this function
import os
try:
import wradlib # noqa
_WRADLIB_AVAILABLE = True
# `read_rainbow` as of wradlib version 1.0.0
try:
from wradlib.io import read_Rainbow as read_rainbow
except ImportError:
from wradlib.io import read_rainbow
except:
_WRADLIB_AVAILABLE = False
import datetime
import numpy as np
from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from ..exceptions import MissingOptionalDependency
from ..io.common import _test_arguments, make_time_unit_str
RAINBOW_FIELD_NAMES = {
"W": "spectrum_width",
"Wv": "spectrum_width_vv", # non standard name
"Wu": "unfiltered_spectrum_width", # non standard name
"Wvu": "unfiltered_spectrum_width_vv", # non standard name
"V": "velocity",
"Vv": "velocity_vv", # non standard name
"Vu": "unfiltered_velocity", # non standard name
"Vvu": "unfiltered_velocity_vv", # non standard name
"dBZ": "reflectivity",
"dBZv": "reflectivity_vv", # non standard name
"dBuZ": "unfiltered_reflectivity", # non standard name
"dBuZv": "unfiltered_reflectivity_vv", # non standard name
"ZDR": "differential_reflectivity",
"ZDRu": "unfiltered_differential_reflectivity", # non standard name
"RhoHV": "cross_correlation_ratio",
"RhoHVu": "unfiltered_cross_correlation_ratio", # non standard name
"PhiDP": "differential_phase",
"uPhiDP": "uncorrected_differential_phase", # non standard name
"uPhiDPu": "uncorrected_unfiltered_differential_phase", # non standard name
"KDP": "specific_differential_phase",
"uKDP": "uncorrected_specific_differential_phase", # non standard name
"uKDPu": "uncorrected_unfiltered_specific_differential_phase", # non standard name
"SQI": "signal_quality_index", # non standard name
"SQIv": "signal_quality_index_vv", # non standard name
"SQIu": "unfiltered_signal_quality_index", # non standard name
"SQIvu": "unfiltered_signal_quality_index_vv", # non standard name
"TEMP": "temperature", # non standard name
"ISO0": "iso0", # non standard name
}
[docs]
def read_rainbow_wrl(
filename,
field_names=None,
additional_metadata=None,
file_field_names=False,
exclude_fields=None,
include_fields=None,
**kwargs
):
"""
Read a RAINBOW file.
This routine has been tested to read rainbow5 files version 5.22.3,
5.34.16 and 5.35.1.
Since the rainbow file format is evolving constantly there is no guaranty
that it can work with other versions.
If necessary, the user should adapt to code according to its own
file version and raise an issue upstream.
Data types read by this routine:
Reflectivity: dBZ, dBuZ, dBZv, dBuZv
Velocity: V, Vu, Vv, Vvu
Spectrum width: W, Wu, Wv, Wvu
Differential reflectivity: ZDR, ZDRu
Co-polar correlation coefficient: RhoHV, RhoHVu
Co-polar differential phase: PhiDP, uPhiDP, uPhiDPu
Specific differential phase: KDP, uKDP, uKDPu
Signal quality parameters: SQI, SQIu, SQIv, SQIvu
Temperature: TEMP
Position of the range bin respect to the ISO0: ISO0
Parameters
----------
filename : str
Name of the RAINBOW file to read.
field_names : dict, optional
Dictionary mapping RAINBOW field names to radar field names. If a
data type found in the file does not appear in this dictionary or has
a value of None it will not be placed in the radar.fields dictionary.
A value of None, the default, will use the mapping defined in the
Py-ART configuration file.
additional_metadata : dict of dicts, optional
Dictionary of dictionaries to retrieve metadata during this read.
This metadata is not used during any successive file reads unless
explicitly included. A value of None, the default, will not
introduct any addition metadata and the file specific or default
metadata as specified by the Py-ART configuration file will be used.
file_field_names : bool, optional
True to use the MDV data type names for the field names. If this
case the field_names parameter is ignored. The field dictionary will
likely only have a 'data' key, unless the fields are defined in
`additional_metadata`.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields specified by include_fields.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `file_field_names` and `field_names` parameters. Set
to None to include all fields not specified by exclude_fields.
Returns
-------
radar : Radar
Radar object containing data from RAINBOW file.
"""
# check that wradlib is available
if not _WRADLIB_AVAILABLE:
raise MissingOptionalDependency(
"wradlib is required to use read_rainbow_wrl but is not installed"
)
# test for non empty kwargs
_test_arguments(kwargs)
# check if it is the right file. Open it and read it
bfile = os.path.basename(filename)
supported_file = (
bfile.endswith(".vol") or bfile.endswith(".azi") or bfile.endswith(".ele")
)
if not supported_file:
raise ValueError(
"Only data files with extension .vol, .azi or .ele are supported"
)
# create metadata retrieval object
if field_names is None:
field_names = RAINBOW_FIELD_NAMES
filemetadata = FileMetadata(
"RAINBOW",
field_names,
additional_metadata,
file_field_names,
exclude_fields,
include_fields,
)
rbf = read_rainbow(filename, loaddata=True)
# check the number of slices
nslices = int(rbf["volume"]["scan"]["pargroup"]["numele"])
if nslices > 1:
single_slice = False
common_slice_info = rbf["volume"]["scan"]["slice"][0]
else:
single_slice = True
common_slice_info = rbf["volume"]["scan"]["slice"]
# check the data type
# all slices should have the same data type
datatype = common_slice_info["slicedata"]["rawdata"]["@type"]
field_name = filemetadata.get_field_name(datatype)
if field_name is None:
raise ValueError("Field Name Unknown")
# get definitions from filemetadata class
latitude = filemetadata("latitude")
longitude = filemetadata("longitude")
altitude = filemetadata("altitude")
metadata = filemetadata("metadata")
sweep_start_ray_index = filemetadata("sweep_start_ray_index")
sweep_end_ray_index = filemetadata("sweep_end_ray_index")
sweep_number = filemetadata("sweep_number")
sweep_mode = filemetadata("sweep_mode")
fixed_angle = filemetadata("fixed_angle")
elevation = filemetadata("elevation")
_range = filemetadata("range")
azimuth = filemetadata("azimuth")
_time = filemetadata("time")
field_dic = filemetadata(field_name)
# other metadata
frequency = filemetadata("frequency")
# get general file information
# position and radar frequency
if "sensorinfo" in rbf["volume"].keys():
latitude["data"] = np.array(
[rbf["volume"]["sensorinfo"]["lat"]], dtype="float64"
)
longitude["data"] = np.array(
[rbf["volume"]["sensorinfo"]["lon"]], dtype="float64"
)
altitude["data"] = np.array(
[rbf["volume"]["sensorinfo"]["alt"]], dtype="float64"
)
frequency["data"] = np.array(
[3e8 / float(rbf["volume"]["sensorinfo"]["wavelen"])], dtype="float64"
)
elif "radarinfo" in rbf["volume"].keys():
latitude["data"] = np.array(
[rbf["volume"]["radarinfo"]["@lat"]], dtype="float64"
)
longitude["data"] = np.array(
[rbf["volume"]["radarinfo"]["@lon"]], dtype="float64"
)
altitude["data"] = np.array(
[rbf["volume"]["radarinfo"]["@alt"]], dtype="float64"
)
frequency["data"] = np.array(
[3e8 / float(rbf["volume"]["radarinfo"]["wavelen"])], dtype="float64"
)
# antenna speed
if "antspeed" in common_slice_info:
ant_speed = float(common_slice_info["antspeed"])
else:
ant_speed = 10.0
print(
"WARNING: Unable to read antenna speed. Default value of "
+ str(ant_speed)
+ " deg/s will be used"
)
# angle step
angle_step = float(common_slice_info["anglestep"])
# sweep_number (is the sweep index)
sweep_number["data"] = np.arange(nslices, dtype="int32")
# get number of rays and number of range bins per sweep
rays_per_sweep = np.empty(nslices, dtype="int32")
if single_slice:
rays_per_sweep[0] = int(common_slice_info["slicedata"]["rawdata"]["@rays"])
maxbin = int(common_slice_info["slicedata"]["rawdata"]["@bins"])
ssri = np.array([0], dtype="int32")
seri = np.array([rays_per_sweep[0] - 1], dtype="int32")
else:
# number of range bins per ray in sweep
nbins_sweep = np.empty(nslices, dtype="int32")
for i in range(nslices):
slice_info = rbf["volume"]["scan"]["slice"][i]
# number of rays per sweep
rays_per_sweep[i] = int(slice_info["slicedata"]["rawdata"]["@rays"])
# number of range bins per ray in sweep
nbins_sweep[i] = int(slice_info["slicedata"]["rawdata"]["@bins"])
# all sweeps have to have the same number of range bins
if any(nbins_sweep != nbins_sweep[0]):
# raise ValueError("number of range bins changes between sweeps")
print(
"WARNING: number of range bins changes between sweeps "
+ "max number of bins will be used"
)
maxbin = nbins_sweep.max()
ssri = np.cumsum(np.append([0], rays_per_sweep[:-1])).astype("int32")
seri = np.cumsum(rays_per_sweep).astype("int32") - 1
# total number of rays and sweep start ray index and end
total_rays = sum(rays_per_sweep)
sweep_start_ray_index["data"] = ssri
sweep_end_ray_index["data"] = seri
# range
if not single_slice:
# all sweeps have to have the same range resolution
for i in range(nslices):
slice_info = rbf["volume"]["scan"]["slice"][i]
if slice_info["rangestep"] != common_slice_info["rangestep"]:
raise ValueError("range resolution changes between sweeps")
r_res = float(common_slice_info["rangestep"]) * 1000.0
if "start_range" in common_slice_info.keys():
start_range = float(common_slice_info["start_range"]) * 1000.0
else:
start_range = 0.0
_range["data"] = np.linspace(
start_range + r_res / 2.0, float(maxbin - 1.0) * r_res + r_res / 2.0, maxbin
).astype("float32")
_range["meters_between_gates"] = r_res
_range["meters_to_center_of_first_gate"] = _range["data"][0]
# containers for data
t_fixed_angle = np.empty(nslices, dtype="float64")
moving_angle = np.empty(total_rays, dtype="float64")
static_angle = np.empty(total_rays, dtype="float64")
time_data = np.empty(total_rays, dtype="float64")
fdata = np.ma.zeros(
(total_rays, maxbin), dtype="float32", fill_value=get_fillvalue()
)
# read data from file
if bfile.endswith(".vol") or bfile.endswith(".azi"):
scan_type = "ppi"
sweep_mode["data"] = np.array(nslices * ["azimuth_surveillance"])
else:
scan_type = "rhi"
sweep_mode["data"] = np.array(["elevation_surveillance"])
# read data from file:
for i in range(nslices):
if single_slice:
slice_info = common_slice_info
else:
slice_info = rbf["volume"]["scan"]["slice"][i]
# fixed angle
t_fixed_angle[i] = float(slice_info["posangle"])
# fixed angle (repeated for each ray)
static_angle[ssri[i] : seri[i] + 1] = t_fixed_angle[i]
# moving angle
moving_angle[ssri[i] : seri[i] + 1], angle_start, angle_stop = _get_angle(
slice_info["slicedata"]["rayinfo"],
angle_step=angle_step,
scan_type=scan_type,
)
# time
time_data[ssri[i] : seri[i] + 1], sweep_start_epoch = _get_time(
slice_info["slicedata"]["@date"],
slice_info["slicedata"]["@time"],
angle_start[0],
angle_stop[-1],
angle_step,
rays_per_sweep[i],
ant_speed,
scan_type=scan_type,
)
if i == 0:
volume_start_epoch = sweep_start_epoch + 0.0
start_time = datetime.datetime.utcfromtimestamp(volume_start_epoch)
# data
fdata[ssri[i] : seri[i] + 1, :] = _get_data(
slice_info["slicedata"]["rawdata"],
rays_per_sweep[i],
nbins_sweep[i],
maxbin,
)
if bfile.endswith(".vol") or bfile.endswith(".azi"):
azimuth["data"] = moving_angle
elevation["data"] = static_angle
else:
azimuth["data"] = static_angle
elevation["data"] = moving_angle
fixed_angle["data"] = t_fixed_angle
_time["data"] = time_data - volume_start_epoch
_time["units"] = make_time_unit_str(start_time)
# fields
fields = {}
# create field dictionary
field_dic["_FillValue"] = get_fillvalue()
field_dic["data"] = fdata
fields[field_name] = field_dic
# metadata
# metadata['instrument_name'] = radar_id
# instrument_parameters
instrument_parameters = dict()
instrument_parameters.update({"frequency": frequency})
return Radar(
_time,
_range,
fields,
metadata,
scan_type,
latitude,
longitude,
altitude,
sweep_number,
sweep_mode,
fixed_angle,
sweep_start_ray_index,
sweep_end_ray_index,
azimuth,
elevation,
instrument_parameters=instrument_parameters,
)
def _get_angle(ray_info, angle_step=None, scan_type="ppi"):
"""
obtains the ray angle start, stop and center
Parameters
----------
ray_info : dictionary of dictionaries
contains the ray info
angle_step : float
Optional. The angle step. Used in case there is no information of
angle stop. Otherwise ignored.
scan_type : str
Default ppi. scan_type. Either ppi or rhi.
Returns
-------
moving_angle : numpy array
the central point of the angle [Deg]
angle_start :
the starting point of the angle [Deg]
angle_stop :
the end point of the angle [Deg]
"""
bin_to_deg = 360.0 / 65536.0
def _extract_angles(data):
angle = np.array(data * bin_to_deg, dtype="float64")
if scan_type == "rhi":
ind = (angle > 225.0).nonzero()
angle[ind] -= 360.0
return angle
try:
angle_start = _extract_angles(ray_info["data"])
if angle_step is None:
raise ValueError("Unknown angle step")
angle_stop = angle_start + angle_step
except TypeError:
angle_start = _extract_angles(ray_info[0]["data"])
angle_stop = _extract_angles(ray_info[1]["data"])
moving_angle = np.angle(
(np.exp(1.0j * np.deg2rad(angle_start)) + np.exp(1.0j * np.deg2rad(angle_stop)))
/ 2.0,
deg=True,
)
moving_angle[moving_angle < 0.0] += 360.0 # [0, 360]
return moving_angle, angle_start, angle_stop
def _get_data(rawdata, nrays, nbins, maxbin):
"""
Obtains the raw data
Parameters
----------
rawdata : dictionary of dictionaries
contains the raw data information
nrays : int
Number of rays in sweep
nbins : int
Number of bins in ray
Returns
-------
data : numpy array
the data
"""
databin = rawdata["data"]
datamin = float(rawdata["@min"])
datamax = float(rawdata["@max"])
datadepth = float(rawdata["@depth"])
datatype = rawdata["@type"]
data = np.array(
datamin + databin * (datamax - datamin) / 2**datadepth, dtype="float32"
)
# fill invalid data with fill value
mask = databin == 0
data[mask.nonzero()] = get_fillvalue()
# put phidp data in the range [-180, 180]
if (datatype == "PhiDP") or (datatype == "uPhiDP") or (datatype == "uPhiDPu"):
is_above_180 = data > 180.0
data[is_above_180.nonzero()] -= 360.0
data = np.reshape(data, [nrays, nbins])
mask = np.reshape(mask, [nrays, nbins])
data_tmp = np.full((nrays, maxbin), get_fillvalue())
data_tmp[:nrays, :nbins] = data
mask_tmp = np.full((nrays, maxbin), True)
mask_tmp[:nrays, :nbins] = mask
masked_data = np.ma.array(data_tmp, mask=mask_tmp, fill_value=get_fillvalue())
return masked_data
def _get_time(
date_sweep,
time_sweep,
first_angle_start,
last_angle_stop,
angle_step,
nrays,
ant_speed,
scan_type="ppi",
):
"""
Computes the time at the center of each ray
Parameters
----------
date_sweep, time_sweep : str
the date and time of the sweep
first_angle_start : float
The starting point of the first angle in the sweep
last_angle_stop : float
The end point of the last angle in the sweep
nrays : int
Number of rays in sweep
ant_speed : float
antenna speed [deg/s]
scan_type : str
Default ppi. scan_type. Either ppi or rhi.
Returns
-------
time_data : numpy array
the time of each ray
sweep_start_epoch : float
sweep start time in seconds since 1.1.1970
"""
datetime_sweep = datetime.datetime.strptime(
date_sweep + " " + time_sweep, "%Y-%m-%d %H:%M:%S"
)
sweep_start_epoch = (datetime_sweep - datetime.datetime(1970, 1, 1)).total_seconds()
if scan_type == "ppi":
if (last_angle_stop > first_angle_start) and (
(last_angle_stop - first_angle_start) / nrays > angle_step
):
sweep_duration = (last_angle_stop - first_angle_start) / ant_speed
else:
sweep_duration = (last_angle_stop + 360.0 - first_angle_start) / ant_speed
else:
if last_angle_stop > first_angle_start:
sweep_duration = (last_angle_stop - first_angle_start) / ant_speed
else:
sweep_duration = (first_angle_start - last_angle_stop) / ant_speed
time_angle = sweep_duration / nrays
sweep_end_epoch = sweep_start_epoch + sweep_duration
time_data = np.linspace(
sweep_start_epoch + time_angle / 2.0,
sweep_end_epoch - time_angle / 2.0,
num=nrays,
)
return time_data, sweep_start_epoch