Source code for pyart.retrieve.cappi

"""
Constant Altitude Plan Position Indicator
"""

import numpy as np
from netCDF4 import num2date
from pandas import to_datetime
from scipy.interpolate import RectBivariateSpline

from pyart.core import Radar


[docs] def create_cappi( radar, fields=None, height=2000, gatefilter=None, vel_field="velocity", same_nyquist=True, nyquist_vector_idx=0, max_height_diff=None, ): """ Create a Constant Altitude Plan Position Indicator (CAPPI) from radar data. Parameters ---------- radar : Radar Py-ART Radar object containing the radar data. fields : list of str, optional List of radar fields to be used for creating the CAPPI. If None, all available fields will be used. Default is None. height : float, optional The altitude at which to create the CAPPI. Default is 2000 meters. gatefilter : GateFilter, optional A GateFilter object to apply masking/filtering to the radar data. Default is None. vel_field : str, optional The name of the velocity field to be used for determining the Nyquist velocity. Default is 'velocity'. same_nyquist : bool, optional Whether to only stack sweeps with the same Nyquist velocity. Default is True. nyquist_vector_idx : int, optional Index for the Nyquist velocity vector if `same_nyquist` is True. Default is 0. max_height_diff : float, optional Maximum allowed difference (meters) between the requested CAPPI height and the closest available gate height. Gates farther than this threshold are masked out. If None, a data-driven tolerance equal to twice the median vertical gate spacing is used. Default is None. Returns ------- Radar A Py-ART Radar object containing the CAPPI at the specified height. Notes ----- CAPPI (Constant Altitude Plan Position Indicator) is a radar visualization technique that provides a horizontal view of meteorological data at a fixed altitude. Reference: https://glossary.ametsoc.org/wiki/Cappi Author ------ Hamid Ali Syed (@syedhamidali) """ if fields is None: fields = list(radar.fields.keys()) # Initialize the first sweep as the reference first_sweep = 0 # Initialize containers for the stacked data and nyquist velocities data_stack = [] nyquist_stack = [] gate_z_stack = [] # Process each sweep individually for sweep in range(radar.nsweeps): sweep_slice = radar.get_slice(sweep) try: nyquist = radar.get_nyquist_vel(sweep=sweep) nyquist = np.round(nyquist) except LookupError: print( f"Nyquist velocity unavailable for sweep {sweep}. Estimating using maximum velocity." ) nyquist = radar.fields[vel_field]["data"][sweep_slice].max() # Extract and sort azimuth angles (used across all fields) azimuth = radar.azimuth["data"][sweep_slice] azimuth_sorted_idx = np.argsort(azimuth) azimuth_sorted = azimuth[azimuth_sorted_idx] time = radar.time["data"][sweep_slice][azimuth_sorted_idx] # Gate heights from radar geometry already include the 4/3 Earth-radius refractivity model sweep_gate_z = radar.gate_z["data"][sweep_slice][azimuth_sorted_idx] if sweep == first_sweep: azimuth_final = azimuth_sorted time_final = time else: # Interpolate gate heights to the reference azimuth grid (keeps refractivity-aware heights aligned) gate_interpolator = RectBivariateSpline( azimuth_sorted, radar.range["data"], sweep_gate_z ) sweep_gate_z = gate_interpolator(azimuth_final, radar.range["data"]) sweep_data = {} for field in fields: data = radar.get_field(sweep, field) # Apply gatefilter if provided if gatefilter is not None: data = np.ma.masked_array( data, gatefilter.gate_excluded[sweep_slice, :] ) data = data[azimuth_sorted_idx] if sweep != first_sweep: # Interpolate data for consistent azimuth ordering across sweeps interpolator = RectBivariateSpline( azimuth_sorted, radar.range["data"], data ) data = interpolator(azimuth_final, radar.range["data"]) sweep_data[field] = data[np.newaxis, :, :] data_stack.append(sweep_data) nyquist_stack.append(nyquist) gate_z_stack.append(sweep_gate_z[np.newaxis, :, :]) nyquist_stack = np.array(nyquist_stack) # Filter for sweeps with similar Nyquist velocities if same_nyquist: nyquist_range = nyquist_stack[nyquist_vector_idx] nyquist_mask = np.abs(nyquist_stack - nyquist_range) <= 1 data_stack = [ sweep_data for i, sweep_data in enumerate(data_stack) if nyquist_mask[i] ] gate_z_stack = [ sweep_gate_z for i, sweep_gate_z in enumerate(gate_z_stack) if nyquist_mask[i] ] z_3d = np.concatenate(gate_z_stack, axis=0) # Find nearest gates to requested height using refractivity/curvature-adjusted geometry height_idx = np.argmin(np.abs(z_3d - height), axis=0) selected_gate_z = np.take_along_axis( z_3d, height_idx[np.newaxis, :, :], axis=0 ).squeeze(axis=0) # Derive a height tolerance if none provided. Use the median positive gate-to-gate # spacing across the volume to approximate vertical resolution. if max_height_diff is None: z_sorted = np.sort(z_3d.ravel()) dz = np.diff(z_sorted) dz = dz[dz > 0] if dz.size == 0: derived_tol = 1000.0 # fallback to 1 km else: derived_tol = 2.0 * np.median(dz) height_tol = max(500.0, derived_tol) # enforce a sensible floor else: height_tol = float(max_height_diff) # Mask gates that are too far from the requested height (e.g., asking for 20 km # when the volume tops out near 14 km). Also warn when the request is outside # the available volume. max_available = z_3d.max() min_available = z_3d.min() if height > max_available + height_tol or height < min_available - height_tol: print( f"Warning: requested CAPPI height {height} m is outside available gates " f"({min_available:.0f}-{max_available:.0f} m). Output will be fully masked." ) height_mask = np.abs(selected_gate_z - height) > height_tol # Generate CAPPI for each field using data_stack fields_data = {} for field in fields: data_3d = np.concatenate( [sweep_data[field] for sweep_data in data_stack], axis=0 ) # Sort azimuth for all sweeps dim0 = data_3d.shape[1:] # Extract the data slice corresponding to the requested height using gate geometry (curved Earth) CAPPI = np.take_along_axis( data_3d, height_idx[np.newaxis, :, :], axis=0 ).squeeze(axis=0) CAPPI = np.ma.array(CAPPI, mask=height_mask) # Retrieve units and handle case where units might be missing units = radar.fields[field].get("units", "").lower() # Determine valid_min and valid_max based on units if units == "dbz": valid_min, valid_max = -10, 80 elif units in ["m/s", "meters per second"]: valid_min, valid_max = -100, 100 elif units == "db": valid_min, valid_max = -7.9, 7.9 else: # If units are not found or don't match known types, set default values or skip masking valid_min, valid_max = None, None # If valid_min or valid_max are still None, set them to conservative defaults or skip if valid_min is None: print(f"Warning: valid_min not set for {field}, using default of -1e10") valid_min = -1e10 # Conservative default if valid_max is None: print(f"Warning: valid_max not set for {field}, using default of 1e10") valid_max = 1e10 # Conservative default # Apply valid_min and valid_max masking if valid_min is not None: CAPPI = np.ma.masked_less(CAPPI, valid_min) if valid_max is not None: CAPPI = np.ma.masked_greater(CAPPI, valid_max) # Convert to masked array with the specified fill value CAPPI.set_fill_value(radar.fields[field].get("_FillValue", np.nan)) CAPPI = np.ma.masked_invalid(CAPPI) CAPPI = np.ma.masked_outside(CAPPI, valid_min, valid_max) fields_data[field] = { "data": CAPPI, "units": radar.fields[field]["units"], "long_name": f"CAPPI {field} at {height} meters", "comment": f"CAPPI {field} calculated at a height of {height} meters", "_FillValue": radar.fields[field].get("_FillValue", np.nan), } # Set the elevation to zeros for CAPPI elevation_final = np.zeros(dim0[0], dtype="float32") # Since we are using the whole volume scan, report mean time try: dtime = to_datetime( num2date(radar.time["data"], radar.time["units"]).astype(str), format="ISO8601", ) except ValueError: dtime = to_datetime( num2date(radar.time["data"], radar.time["units"]).astype(str) ) dtime = dtime.mean() time = radar.time.copy() time["data"] = time_final time["mean"] = dtime # CAPPI represents a single synthesized sweep on the reference azimuth grid n_rays = azimuth_final.size sweep_number = {"data": np.array([0], dtype="int32")} sweep_mode = {"data": np.array(["ppi"], dtype="object")} sweep_start_ray_index = {"data": np.array([0], dtype="int32")} sweep_end_ray_index = {"data": np.array([n_rays - 1], dtype="int32")} fixed_angle = {"data": np.array([0.0], dtype="float32")} # Create the Radar object with the new CAPPI data. Keep site metadata but # reset sweep descriptors and azimuth to match the CAPPI grid so display # libraries do not mis-align fields (which showed up as a 90° rotation). return Radar( time=time, _range=radar.range.copy(), fields=fields_data, metadata=radar.metadata.copy(), scan_type="ppi", latitude=radar.latitude.copy(), longitude=radar.longitude.copy(), altitude=radar.altitude.copy(), sweep_number=sweep_number, sweep_mode=sweep_mode, fixed_angle=fixed_angle, sweep_start_ray_index=sweep_start_ray_index, sweep_end_ray_index=sweep_end_ray_index, azimuth={"data": azimuth_final}, elevation={"data": elevation_final}, instrument_parameters=radar.instrument_parameters, )