"""
Utilities for reading CSU-CHILL CHL files.
"""
from datetime import datetime
import struct
import numpy as np
from ..config import FileMetadata, get_fillvalue
from ..core.radar import Radar
from .common import make_time_unit_str, _test_arguments, prepare_for_read
[docs]def read_chl(filename, field_names=None, additional_metadata=None,
file_field_names=None, exclude_fields=None,
include_fields=None, use_file_field_attributes=True, **kwargs):
"""
Read a CSU-CHILL CHL file.
Parameters
----------
filename : str
Name of CHL file.
field_names : dict, optional
Dictionary mapping CHL field names to radar field names. If a
data type found in the file does not appear in this dictionary or has
a value of None it will not be placed in the radar.fields dictionary.
A value of None, the default, will use the mapping defined in the
Py-ART configuration file.
additional_metadata : dict of dicts, optional
Dictionary of dictionaries to retrieve metadata from during this read.
This metadata is not used during any successive file reads unless
explicitly included. A value of None, the default, will not
introduct any addition metadata and the file specific or default
metadata as specified by the Py-ART configuration file will be used.
file_field_names : bool, optional
True to use the CHL field names for the field names in the radar
object. If this case the field_names parameter is ignored.
The field dictionary will likely only have a 'data' key, unless
the fields are defined in `additional_metadata`.
exclude_fields : list or None, optional
List of fields to exclude from the radar object. This is applied
after the `file_field_names` and `field_names` parameters.
include_fields : list or None, optional
List of fields to include from the radar object. This is applied
after the `field_file_names` and `field_names` parameters. Set to
None to include all fields not in exclude_fields.
use_file_field_attributes : bool, optional
True to use information provided by in the file to set the field
attribute `long_name`, `units`, `valid_max`, and `valid_min`. False
will not set these unless they are defined in the configuration file
or in `additional_metadata`.
Returns
-------
radar : Radar
Radar object containing data from CHL file.
"""
# test for non empty kwargs
_test_arguments(kwargs)
# create metadata retrival object
filemetadata = FileMetadata('chl', field_names, additional_metadata,
file_field_names, exclude_fields,
include_fields)
# read data
chl_file = ChlFile(prepare_for_read(filename))
# time
time = filemetadata('time')
tdata = np.array(chl_file.time)
min_time = np.floor(tdata.min())
time['data'] = (tdata - min_time).astype('float64')
time['units'] = make_time_unit_str(datetime.utcfromtimestamp(min_time))
# range
_range = filemetadata('range')
_range['data'] = (np.array(range(chl_file.ngates)) *
chl_file.gate_spacing + chl_file.first_gate_offset)
_range['meters_between_gates'] = np.array(chl_file.gate_spacing)
_range['meters_to_center_of_first_gate'] = 0.0
# scan_type
scan_type = SCAN_MODE_NAMES[chl_file.scan_types[-1]]
# fields
fields = {}
for i, fdata in chl_file.fields.items():
field_info = chl_file.field_info[i]
field_name = filemetadata.get_field_name(field_info['name'])
if field_name is None:
continue
field_dic = filemetadata(field_name)
np.ma.set_fill_value(fdata, get_fillvalue())
field_dic['data'] = fdata
field_dic['_FillValue'] = get_fillvalue()
if use_file_field_attributes:
field_dic['long_name'] = field_info['descr']
field_dic['units'] = field_info['units']
field_dic['valid_max'] = field_info['max_val']
field_dic['valid_min'] = field_info['min_val']
fields[field_name] = field_dic
# metadata
metadata = filemetadata('metadata')
metadata['instrument_name'] = chl_file.radar_info['radar_name']
metadata['original_container'] = 'CHL'
# longitude, latitude, altitude
latitude = filemetadata('latitude')
longitude = filemetadata('longitude')
altitude = filemetadata('altitude')
latitude['data'] = np.array([chl_file.radar_info['latitude']], 'f8')
longitude['data'] = np.array([chl_file.radar_info['longitude']], 'f8')
altitude['data'] = np.array([chl_file.radar_info['altitude']], 'f8')
# sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index,
# sweep_end_ray_index
sweep_number = filemetadata('sweep_number')
sweep_mode = filemetadata('sweep_mode')
fixed_angle = filemetadata('fixed_angle')
sweep_start_ray_index = filemetadata('sweep_start_ray_index')
sweep_end_ray_index = filemetadata('sweep_end_ray_index')
sweep_number['data'] = np.arange(chl_file.num_sweeps, dtype='int32')
sweep_mode['data'] = np.array(
[SCAN_MODE_NAMES[i] for i in chl_file.scan_types], dtype='S')
fixed_angle['data'] = np.array(chl_file.fixed_angle, dtype='float32')
ray_count = chl_file.rays_per_sweep
ssri = np.cumsum(np.append([0], ray_count[:-1])).astype('int32')
sweep_start_ray_index['data'] = ssri
sweep_end_ray_index['data'] = np.cumsum(ray_count).astype('int32') - 1
# azimuth, elevation
azimuth = filemetadata('azimuth')
elevation = filemetadata('elevation')
azimuth['data'] = np.array(chl_file.azimuth, dtype='float32')
elevation['data'] = np.array(chl_file.elevation, dtype='float32')
# instrument parameters
instrument_parameters = None
chl_file.close()
return Radar(
time, _range, fields, metadata, scan_type,
latitude, longitude, altitude,
sweep_number, sweep_mode, fixed_angle, sweep_start_ray_index,
sweep_end_ray_index,
azimuth, elevation,
instrument_parameters=instrument_parameters)
class ChlFile(object):
"""
A file object for CHL data.
Parameters
----------
filename : str or file-like.
Name of CHL file to read or a file-like object pointing to the
beginning of such a file.
ns_time : bool
True to determine ray collection times to the nano-second, False
will only determine times to the second.
debug : bool
True to keep packet data in the _packets attribute to aid in
debugging.
Attributes
----------
ngates : int
Number of gates per ray.
num_sweeps : int
Number of sweeps in the volume.
gate_spacing : float
Spacing in meters between gates.
first_gate_offset : float
Distance in meters to the first range gate.
time : list of ints
Time in seconds in epoch for each ray in the volume.
azimuth : list of floats
Azimuth angle for each ray in the volume in degrees.
elevation : list of floats
Elevation angle for each ray in the volume in degrees.
fixed_angle : list of floats
Fixed angles for each sweep.
sweep_number : list of ints
Sweep numbers reported in file.
scan_types : list of ints
Chill defined scan type for each sweep.
rays_per_sweep : list of ints
Number of rays in each sweep.
fields : dict
Dictionary of field data index by field number.
radar_info : dict
Radar information recorded in the file.
field_info : dict
Field information (limits, name, etc.) recorded in the file.
processor_info : dict
Processor information recorded in the file.
"""
def __init__(self, filename, ns_time=True, debug=False):
# initalize attributes
self.ngates = None
self.num_sweeps = None
self.gate_spacing = None
self.time = []
self.azimuth = []
self.elevation = []
self.fixed_angle = []
self.sweep_number = []
self.scan_types = []
self.rays_per_sweep = None
self.fields = {}
self.radar_info = None
self.field_info = {}
self.processor_info = None
self.first_gate_offset = None
# private attributes
self._dstring = b'' # string containing field data.
self._bit_mask = None # bit mask specifying fields present in file.
self._dtype = None # NumPy dtype for a single gate (all fields).
self._ray_bsize = None # size in bytes of a single ray (all fields).
self._packets = [] # List of packets, not set if debug is False.
self._field_nums = None # Field number available in file.
self._rays_in_current_sweep = None # accumulator for counting rays.
self._fh = None # file handler
self._include_ns_time = ns_time
# read all blocks from the file
if hasattr(filename, 'read'):
self._fh = filename
else:
self._fh = open(filename, "rb")
packet = 1
while packet is not None:
packet = self._read_block()
if debug:
self._packets.append(packet)
self._extract_fields()
self.rays_per_sweep.append(self._rays_in_current_sweep)
def close(self):
""" Close the file. """
self._fh.close()
def _read_block(self):
""" Read a block from an open CHL file. """
pld = self._fh.read(8)
if pld == b'':
return None
block_id, length = struct.unpack("<2i", pld)
payload = self._fh.read(length - 8)
# parse the block
if block_id == ARCH_ID_FILE_HDR:
packet = self._parse_file_hdr_block(payload)
elif block_id == ARCH_ID_FIELD_SCALE:
packet = self._parse_field_scale_block(payload)
elif block_id == ARCH_ID_RAY_HDR:
packet = self._parse_ray_hdr_block(payload)
elif block_id == HSK_ID_RADAR_INFO:
packet = self._parse_radar_info_block(payload)
elif block_id == HSK_ID_PROCESSOR_INFO:
packet = self._parse_processor_info_block(payload)
elif block_id == HSK_ID_SCAN_SEG:
packet = self._parse_scan_seg_block(payload)
elif block_id == ARCH_ID_SWEEP_BLOCK:
packet = self._parse_sweep_block(payload)
else:
packet = {}
packet['block_id'] = block_id
packet['length'] = length
return packet
# Block parsers
def _parse_file_hdr_block(self, payload):
""" Parse a field_hdr block. """
return _unpack_structure(payload, ARCH_FILE_HDR_T)
def _parse_field_scale_block(self, payload):
""" Parse a field_scale block. Add scale to field_info attr. """
packet = _unpack_structure(payload, FIELD_SCALE_T)
packet['name'] = packet['name'].decode('utf-8').rstrip('\x00')
packet['units'] = packet['units'].decode('utf-8').rstrip('\x00')
packet['descr'] = packet['descr'].decode('utf-8').rstrip('\x00')
self.field_info[packet['bit_mask_pos']] = packet
return packet
def _parse_radar_info_block(self, payload):
""" Parse a radar_info block. Update metadata attribute. """
packet = _unpack_structure(payload, RADAR_INFO_T)
packet['radar_name'] = (
packet['radar_name'].decode('utf-8').rstrip('\x00'))
self.radar_info = packet.copy()
return packet
def _parse_processor_info_block(self, payload):
""" Parse a processor_info block. Set dr attribute. """
packet = _unpack_structure(payload, PROCESSOR_INFO)
self.gate_spacing = packet['gate_spacing']
self.first_gate_offset = packet['range_offset']
self.processor_info = packet.copy()
return packet
def _parse_scan_seg_block(self, payload):
""" Parse a scan_seg_block. Update sweep attributes. """
packet = _unpack_structure(payload, SCAN_SEG)
self.sweep_number.append(packet['sweep_num'])
self.fixed_angle.append(packet['current_fixed_angle'])
self.scan_types.append(packet['scan_type'])
if self.rays_per_sweep is None:
self.rays_per_sweep = [] # first sweep block
self._rays_in_current_sweep = 0
else:
self.rays_per_sweep.append(self._rays_in_current_sweep)
self._rays_in_current_sweep = 0
return packet
def _parse_sweep_block(self, payload):
""" Parse a sweep block. Set num_sweeps attribute. """
packet = {}
packet['num_sweeps'] = struct.unpack('I', payload[0:4])[0]
packet['swp_offsets'] = struct.unpack(
str(packet['num_sweeps']) + 'Q', payload[4:])
self.num_sweeps = packet['num_sweeps']
return packet
def _parse_ray_hdr_block(self, payload):
""" Parse a ray_hdr block. Update associated attributes. """
packet = _unpack_structure(payload, ARCH_RAY_HEADER)
if self._bit_mask is None:
# this is the first ray_hdr block read
self.ngates = packet['gates']
self._bit_mask = packet['bit_mask']
self._field_nums = [b for b in range(38) if self._bit_mask & 2**b]
self._dtype = ','.join([DATA_FORMAT[self.field_info[i]['format']]
for i in self._field_nums])
self._ray_bsize = np.dtype(self._dtype).itemsize * packet['gates']
else:
# check that the bit_mask and number of gates are constant
if packet['bit_mask'] != self._bit_mask:
raise NotImplementedError('bit_mask is not consistent.')
if packet['gates'] != self.ngates:
raise NotImplementedError('number of gates vary.')
# store ray data and pointing data
self._dstring += self._fh.read(self._ray_bsize)
if self._include_ns_time:
self.time.append(packet['time'] + packet['ns_time']/1e9)
else:
self.time.append(packet['time'])
self.azimuth.append(packet['azimuth'])
self.elevation.append(packet['elevation'])
self._rays_in_current_sweep += 1
return packet
def _extract_fields(self):
""" Extract field data from _dstring attribute post read. """
all_data = np.frombuffer(self._dstring, dtype=self._dtype)
all_data = all_data.reshape(-1, self.ngates)
for i, field_num in enumerate(self._field_nums):
fdata = np.ma.masked_values(all_data[all_data.dtype.names[i]], 0)
# apply scale and offset factors to interger data types
if issubclass(fdata.dtype.type, np.integer):
dat_factor = float(self.field_info[field_num]['dat_factor'])
dat_bias = float(self.field_info[field_num]['dat_bias'])
fld_factor = float(self.field_info[field_num]['fld_factor'])
fdata = (fdata * dat_factor + dat_bias) / fld_factor
self.fields[field_num] = fdata
return
# CHL packet types
ARCH_FORMAT_VERSION = 0x00010000
ARCH_ID_CONTROL = 0x5aa80001
ARCH_ID_FIELD_SCALE = 0x5aa80002
ARCH_ID_RAY_HDR = 0x5aa80003
ARCH_ID_FILE_HDR = 0x5aa80004
ARCH_ID_SWEEP_BLOCK = 0x5aa80005
HSK_ID_PROCESSOR_INFO = 0x5aa50003
HSK_ID_RADAR_INFO = 0x5aa50001
HSK_ID_SCAN_SEG = 0x5aa50002
# Additional constants
SCAN_MODE_NAMES = ['ppi', 'rhi', 'fixed', 'manual ppi', 'manual rhi', 'idle']
DATA_FORMAT = ['uint8', 'uint64', 'float32', 'uint16']
##############
# Structures #
##############
def _unpack_structure(string, structure):
""" Unpack a structure. """
fmt = ''.join([i[1] for i in structure])
tpl = struct.unpack(fmt, string)
return dict(zip([i[0] for i in structure], tpl))
ARCH_FILE_HDR_T = (
('version', 'I'),
('creation_version', 'I'),
('creator_id', '32s'),
('sweep_table_offset', 'Q'),
)
ARCH_RAY_HEADER = (
('azimuth', 'f'),
('elevation', 'f'),
('azimuth_width', 'f'),
('elevation_width', 'f'),
('gates', 'H'),
('beam_index', 'H'),
('ns_time', 'I'),
('time', 'Q'),
('bit_mask', 'Q'),
('ray_number', 'I'),
('num_pulses', 'I'),
)
FIELD_SCALE_T = (
('format', 'i'),
('min_val', 'f'),
('max_val', 'f'),
('bit_mask_pos', 'i'),
('type_hint', 'i'),
('fld_factor', 'i'),
('dat_factor', 'i'),
('dat_bias', 'i'),
('name', '32s'),
('units', '32s'),
('descr', '128s')
)
RADAR_INFO_T = (
('radar_name', '32s'),
('latitude', 'f'),
('longitude', 'f'),
('altitude', 'f'),
('beamwidth', 'f'),
('wavelength_cm', 'f'),
('unused1', 'f'),
('unused2', 'f'),
('unused3', 'f'),
('unused4', 'f'),
('gain_ant_h', 'f'),
('gain_ant_v', 'f'),
('zdr_cal_base', 'f'),
('phidp_rot', 'f'),
('base_radar_constant', 'f'),
('unused5', 'f'),
('power_measurement_loss_h', 'f'),
('power_measurement_loss_v', 'f'),
('zdr_cal_base_vhs', 'f'),
('test_power_h', 'f'),
('test_power_v', 'f'),
('dc_loss_h', 'f'),
('dc_loss_v', 'f'),
)
PROCESSOR_INFO = (
('polarization_mode', 'i'),
('processing_mode', 'i'),
('pulse_type', 'i'),
('test_type', 'i'),
('integration_cycle_pulses', 'I'),
('clutter_filter_number', 'I'),
('range_gate_averaging', 'I'),
('indexed_beam_width', 'f'),
('gate_spacing', 'f'),
('prt_usec', 'f'),
('range_start', 'f'),
('range_stop', 'f'),
('max_gate', 'I'),
('test_power', 'f'),
('unused1', 'f'),
('unused2', 'f'),
('test_pulse_range', 'f'),
('test_pulse_length', 'f'),
('prt2', 'f'),
('range_offset', 'f'),
)
SCAN_SEG = (
('az_manual', 'f'),
('el_manual', 'f'),
('az_start', 'f'),
('el_start', 'f'),
('scan_rate', 'f'),
('segname', '24s'),
('opt', 'i'),
('follow_mode', 'i'),
('scan_type', 'i'),
('scan_flags', 'I'),
('volume_num', 'I'),
('sweep_num', 'I'),
('time_limit', 'I'),
('webtilt', 'I'),
('left_limit', 'f'),
('right_limit', 'f'),
('up_limit', 'f'),
('down_limit', 'f'),
('step', 'f'),
('max_sweeps', 'I'),
('filter_break_sweep', 'I'),
('clutter_filter1', 'I'),
('clutter_filter2', 'I'),
('project', '16s'),
('current_fixed_angle', 'f'),
)