Source code for pyart.util.radar_utils

"""
Functions for working radar instances.

"""

import copy

import numpy as np
from netCDF4 import date2num

from ..config import get_fillvalue
from . import datetime_utils


[docs]def is_vpt(radar, offset=0.5): """ Determine if a Radar appears to be a vertical pointing scan. This function only verifies that the object is a vertical pointing scan, use the :py:func:`to_vpt` function to convert the radar to a vpt scan if this function returns True. Parameters ---------- radar : Radar Radar object to determine if. offset : float, optional Maximum offset of the elevation from 90 degrees to still consider to be vertically pointing. Returns ------- flag : bool True if the radar appear to be verticle pointing, False if not. """ # check that the elevation is within offset of 90 degrees. elev = radar.elevation['data'] return np.all((elev < 90.0 + offset) & (elev > 90.0 - offset))
[docs]def to_vpt(radar, single_scan=True): """ Convert an existing Radar object to represent a vertical pointing scan. This function does not verify that the Radar object contains a vertical pointing scan. To perform such a check use :py:func:`is_vpt`. Parameters ---------- radar : Radar Mislabeled vertical pointing scan Radar object to convert to be properly labeled. This object is converted in place, no copy of the existing data is made. single_scan : bool, optional True to convert the volume to a single scan, any azimuth angle data is lost. False will convert the scan to contain the same number of scans as rays, azimuth angles are retained. """ if single_scan: nsweeps = 1 radar.azimuth['data'][:] = 0.0 seri = np.array([radar.nrays - 1], dtype='int32') radar.sweep_end_ray_index['data'] = seri else: nsweeps = radar.nrays # radar.azimuth not adjusted radar.sweep_end_ray_index['data'] = np.arange(nsweeps, dtype='int32') radar.scan_type = 'vpt' radar.nsweeps = nsweeps radar.target_scan_rate = None # no scanning radar.elevation['data'][:] = 90.0 radar.sweep_number['data'] = np.arange(nsweeps, dtype='int32') radar.sweep_mode['data'] = np.array(['vertical_pointing'] * nsweeps) radar.fixed_angle['data'] = np.ones(nsweeps, dtype='float32') * 90.0 radar.sweep_start_ray_index['data'] = np.arange(nsweeps, dtype='int32') if radar.instrument_parameters is not None: for key in ['prt_mode', 'follow_mode', 'polarization_mode']: if key in radar.instrument_parameters: ip_dic = radar.instrument_parameters[key] ip_dic['data'] = np.array([ip_dic['data'][0]] * nsweeps) # Attributes that do not need any changes # radar.altitude # radar.altitude_agl # radar.latitude # radar.longitude # radar.range # radar.ngates # radar.nrays # radar.metadata # radar.radar_calibration # radar.time # radar.fields # radar.antenna_transition # radar.scan_rate return
[docs]def join_radar(radar1, radar2): """ Combine two radar instances into one. Parameters ---------- radar1 : Radar Radar object. radar2 : Radar Radar object. """ # must have same gate spacing new_radar = copy.deepcopy(radar1) new_radar.azimuth['data'] = np.append( radar1.azimuth['data'], radar2.azimuth['data']) new_radar.elevation['data'] = np.append( radar1.elevation['data'], radar2.elevation['data']) new_radar.fixed_angle['data'] = np.append( radar1.fixed_angle['data'], radar2.fixed_angle['data']) new_radar.sweep_number['data'] = np.append( radar1.sweep_number['data'], radar2.sweep_number['data']) new_radar.sweep_start_ray_index['data'] = np.append( radar1.sweep_start_ray_index['data'], radar2.sweep_start_ray_index['data'] + radar1.nrays) new_radar.sweep_end_ray_index['data'] = np.append( radar1.sweep_end_ray_index['data'], radar2.sweep_end_ray_index['data'] + radar1.nrays) new_radar.nsweeps += radar2.nsweeps new_radar.sweep_mode['data'] = np.append( radar1.sweep_mode['data'], radar2.sweep_mode['data']) if len(radar1.range['data']) >= len(radar2.range['data']): new_radar.range['data'] = radar1.range['data'] else: new_radar.range['data'] = radar2.range['data'] new_radar.ngates = len(new_radar.range['data']) # to combine times we need to reference them to a standard # for this we'll use epoch time r1num = datetime_utils.datetimes_from_radar(radar1, epoch=True) r2num = datetime_utils.datetimes_from_radar(radar2, epoch=True) new_radar.time['data'] = date2num( np.append(r1num, r2num), datetime_utils.EPOCH_UNITS) new_radar.time['units'] = datetime_utils.EPOCH_UNITS new_radar.nrays = len(new_radar.time['data']) for var in new_radar.fields.keys(): sh1 = radar1.fields[var]['data'].shape sh2 = radar2.fields[var]['data'].shape new_field_shape = (sh1[0] + sh2[0], max(sh1[1], sh2[1])) new_field = np.ma.zeros(new_field_shape) new_field[:] = np.ma.masked new_field.set_fill_value(get_fillvalue()) new_field[0:sh1[0], 0:sh1[1]] = radar1.fields[var]['data'] new_field[sh1[0]:, 0:sh2[1]] = radar2.fields[var]['data'] new_radar.fields[var]['data'] = new_field # radar locations # TODO moving platforms - any more? if (len(radar1.latitude['data']) == 1 & len(radar2.latitude['data']) == 1 & len(radar1.longitude['data']) == 1 & len(radar2.longitude['data']) == 1 & len(radar1.altitude['data']) == 1 & len(radar2.altitude['data']) == 1): lat1 = float(radar1.latitude['data']) lon1 = float(radar1.longitude['data']) alt1 = float(radar1.altitude['data']) lat2 = float(radar2.latitude['data']) lon2 = float(radar2.longitude['data']) alt2 = float(radar2.altitude['data']) if (lat1 != lat2) or (lon1 != lon2) or (alt1 != alt2): ones1 = np.ones(len(radar1.time['data']), dtype='float32') ones2 = np.ones(len(radar2.time['data']), dtype='float32') new_radar.latitude['data'] = np.append(ones1 * lat1, ones2 * lat2) new_radar.longitude['data'] = np.append(ones1 * lon1, ones2 * lon2) new_radar.latitude['data'] = np.append(ones1 * alt1, ones2 * alt2) else: new_radar.latitude['data'] = radar1.latitude['data'] new_radar.longitude['data'] = radar1.longitude['data'] new_radar.altitude['data'] = radar1.altitude['data'] else: new_radar.latitude['data'] = np.append(radar1.latitude['data'], radar2.latitude['data']) new_radar.longitude['data'] = np.append(radar1.longitude['data'], radar2.longitude['data']) new_radar.altitude['data'] = np.append(radar1.altitude['data'], radar2.altitude['data']) return new_radar
[docs]def image_mute_radar( radar, field, mute_field, mute_threshold, field_threshold=None): """ This function will split a field based on thresholds from another field. Specifically, it was designed to separate areas of reflectivity where the correlation coefficient is less than a certain threshold to discern melting precipitation. Parameters ---------- radar : Radar Radar instance which provides the fields for muting. field : str Name of field to image mute. mute_field : str Name of field to image mute by. mute_threshold : float Threshold value to mute by. field_threshold : float Additional threshold to mask. Returns ------- radar : Radar Radar object with 2 new fields from input field, one muted and one not muted. """ # add checks for field availability if field not in radar.fields.keys(): raise KeyError( 'Failed - ', field, ' field to mute not found in Radar object.') if mute_field not in radar.fields.keys(): raise KeyError( 'Failed - ', mute_field, ' field to mute by not found in Radar object.') # get data from fields data_to_mute = radar.fields[field]['data'] data_mute_by = radar.fields[mute_field]['data'] # create filters # field_filter is used if user wants to use additional criteria in the # original field if field_threshold is not None: field_filter = data_to_mute >= field_threshold else: field_filter = None # mute_filter will be the primary filter for determining muted regions mute_filter = data_mute_by <= mute_threshold # mute_mask is the combined filter if field_filter is None: mute_mask = mute_filter else: mute_mask = mute_filter & field_filter # break up the field into muted regions and non muted regions non_muted_field = np.ma.masked_where(mute_mask, data_to_mute) non_muted_field = np.ma.masked_invalid(non_muted_field) muted_field = np.ma.masked_where(~mute_mask, data_to_mute) muted_field = np.ma.masked_invalid(muted_field) # add fields to a dictionary and save to radar object non_muted_dict = radar.fields[field].copy() non_muted_dict['data'] = non_muted_field non_muted_dict['long_name'] = 'Non-muted ' + field radar.add_field('nonmuted_'+field, non_muted_dict) muted_dict = radar.fields[field].copy() muted_dict['data'] = muted_field muted_dict['long_name'] = 'Muted ' + field radar.add_field('muted_'+field, muted_dict) return radar