Source code for pyart.retrieve.cappi

"""
Constant Altitude Plan Position Indicator
"""

import numpy as np
from netCDF4 import num2date
from pandas import to_datetime
from scipy.interpolate import RectBivariateSpline

from pyart.core import Radar


[docs]def create_cappi( radar, fields=None, height=2000, gatefilter=None, vel_field="velocity", same_nyquist=True, nyquist_vector_idx=0, ): """ Create a Constant Altitude Plan Position Indicator (CAPPI) from radar data. Parameters ---------- radar : Radar Py-ART Radar object containing the radar data. fields : list of str, optional List of radar fields to be used for creating the CAPPI. If None, all available fields will be used. Default is None. height : float, optional The altitude at which to create the CAPPI. Default is 2000 meters. gatefilter : GateFilter, optional A GateFilter object to apply masking/filtering to the radar data. Default is None. vel_field : str, optional The name of the velocity field to be used for determining the Nyquist velocity. Default is 'velocity'. same_nyquist : bool, optional Whether to only stack sweeps with the same Nyquist velocity. Default is True. nyquist_vector_idx : int, optional Index for the Nyquist velocity vector if `same_nyquist` is True. Default is 0. Returns ------- Radar A Py-ART Radar object containing the CAPPI at the specified height. Notes ----- CAPPI (Constant Altitude Plan Position Indicator) is a radar visualization technique that provides a horizontal view of meteorological data at a fixed altitude. Reference: https://glossary.ametsoc.org/wiki/Cappi Author ------ Hamid Ali Syed (@syedhamidali) """ if fields is None: fields = list(radar.fields.keys()) # Initialize the first sweep as the reference first_sweep = 0 # Initialize containers for the stacked data and nyquist velocities data_stack = [] nyquist_stack = [] # Process each sweep individually for sweep in range(radar.nsweeps): sweep_slice = radar.get_slice(sweep) try: nyquist = radar.get_nyquist_vel(sweep=sweep) nyquist = np.round(nyquist) except LookupError: print( f"Nyquist velocity unavailable for sweep {sweep}. Estimating using maximum velocity." ) nyquist = radar.fields[vel_field]["data"][sweep_slice].max() sweep_data = {} for field in fields: data = radar.get_field(sweep, field) # Apply gatefilter if provided if gatefilter is not None: data = np.ma.masked_array( data, gatefilter.gate_excluded[sweep_slice, :] ) time = radar.time["data"][sweep_slice] # Extract and sort azimuth angles azimuth = radar.azimuth["data"][sweep_slice] azimuth_sorted_idx = np.argsort(azimuth) azimuth = azimuth[azimuth_sorted_idx] data = data[azimuth_sorted_idx] # Store initial lat/lon for reordering if sweep == first_sweep: azimuth_final = azimuth time_final = time else: # Interpolate data for consistent azimuth ordering across sweeps interpolator = RectBivariateSpline(azimuth, radar.range["data"], data) data = interpolator(azimuth_final, radar.range["data"]) sweep_data[field] = data[np.newaxis, :, :] data_stack.append(sweep_data) nyquist_stack.append(nyquist) nyquist_stack = np.array(nyquist_stack) # Filter for sweeps with similar Nyquist velocities if same_nyquist: nyquist_range = nyquist_stack[nyquist_vector_idx] nyquist_mask = np.abs(nyquist_stack - nyquist_range) <= 1 data_stack = [ sweep_data for i, sweep_data in enumerate(data_stack) if nyquist_mask[i] ] # Generate CAPPI for each field using data_stack fields_data = {} for field in fields: data_3d = np.concatenate( [sweep_data[field] for sweep_data in data_stack], axis=0 ) # Sort azimuth for all sweeps dim0 = data_3d.shape[1:] azimuths = np.linspace(0, 359, dim0[0]) elevation_angles = radar.fixed_angle["data"][: data_3d.shape[0]] ranges = radar.range["data"] theta = (450 - azimuths) % 360 THETA, PHI, R = np.meshgrid(theta, elevation_angles, ranges) Z = R * np.sin(PHI * np.pi / 180) # Extract the data slice corresponding to the requested height height_idx = np.argmin(np.abs(Z - height), axis=0) CAPPI = np.array( [ data_3d[height_idx[j, i], j, i] for j in range(dim0[0]) for i in range(dim0[1]) ] ).reshape(dim0) # Retrieve units and handle case where units might be missing units = radar.fields[field].get("units", "").lower() # Determine valid_min and valid_max based on units if units == "dbz": valid_min, valid_max = -10, 80 elif units in ["m/s", "meters per second"]: valid_min, valid_max = -100, 100 elif units == "db": valid_min, valid_max = -7.9, 7.9 else: # If units are not found or don't match known types, set default values or skip masking valid_min, valid_max = None, None # If valid_min or valid_max are still None, set them to conservative defaults or skip if valid_min is None: print(f"Warning: valid_min not set for {field}, using default of -1e10") valid_min = -1e10 # Conservative default if valid_max is None: print(f"Warning: valid_max not set for {field}, using default of 1e10") valid_max = 1e10 # Conservative default # Apply valid_min and valid_max masking if valid_min is not None: CAPPI = np.ma.masked_less(CAPPI, valid_min) if valid_max is not None: CAPPI = np.ma.masked_greater(CAPPI, valid_max) # Convert to masked array with the specified fill value CAPPI.set_fill_value(radar.fields[field].get("_FillValue", np.nan)) CAPPI = np.ma.masked_invalid(CAPPI) CAPPI = np.ma.masked_outside(CAPPI, valid_min, valid_max) fields_data[field] = { "data": CAPPI, "units": radar.fields[field]["units"], "long_name": f"CAPPI {field} at {height} meters", "comment": f"CAPPI {field} calculated at a height of {height} meters", "_FillValue": radar.fields[field].get("_FillValue", np.nan), } # Set the elevation to zeros for CAPPI elevation_final = np.zeros(dim0[0], dtype="float32") # Since we are using the whole volume scan, report mean time try: dtime = to_datetime( num2date(radar.time["data"], radar.time["units"]).astype(str), format="ISO8601", ) except ValueError: dtime = to_datetime( num2date(radar.time["data"], radar.time["units"]).astype(str) ) dtime = dtime.mean() time = radar.time.copy() time["data"] = time_final time["mean"] = dtime # Create the Radar object with the new CAPPI data return Radar( time=radar.time.copy(), _range=radar.range.copy(), fields=fields_data, metadata=radar.metadata.copy(), scan_type=radar.scan_type, latitude=radar.latitude.copy(), longitude=radar.longitude.copy(), altitude=radar.altitude.copy(), sweep_number=radar.sweep_number.copy(), sweep_mode=radar.sweep_mode.copy(), fixed_angle=radar.fixed_angle.copy(), sweep_start_ray_index=radar.sweep_start_ray_index.copy(), sweep_end_ray_index=radar.sweep_end_ray_index.copy(), azimuth=radar.azimuth.copy(), elevation={"data": elevation_final}, instrument_parameters=radar.instrument_parameters, )