"""
Functions for radiosonde related calculations.
"""
import numpy as np
import xarray as xr
from operator import itemgetter
from itertools import groupby
import metpy.calc as mpcalc
from metpy.units import units
from act.utils.data_utils import convert_to_potential_temp
[docs]def calculate_precipitable_water(ds, temp_name='tdry', rh_name='rh', pres_name='pres'):
"""
Function to calculate precipitable water vapor from ARM sondewnpn b1 data.
Will first calculate saturation vapor pressure of all data using Arden-Buck
equations, then calculate specific humidity and integrate over all pressure
levels to give us a precipitable water value in centimeters.
ds : xarray.Dataset
Xarray dataset as read in by the ACT netCDF reader.
temp_name : str
Name of temperature field to use. Defaults to 'tdry' for sondewnpn b1
level data.
rh_name : str
Name of relative humidity field to use. Defaults to 'rh' for sondewnpn
b1 level data.
pres_name : str
Name of atmospheric pressure field to use. Defaults to 'pres' for
sondewnpn b1 level data.
"""
temp = ds[temp_name].values
rh = ds[rh_name].values
pres = ds[pres_name].values
# Get list of temperature values for saturation vapor pressure calc
temperature = []
for t in np.nditer(temp):
temperature.append(t)
# Apply Arden-Buck equation to get saturation vapor pressure
sat_vap_pres = []
for t in temperature:
# Over liquid water, above freezing
if t >= 0:
sat_vap_pres.append(0.61121 * np.exp((18.678 - (t / 234.5)) * (t / (257.14 + t))))
# Over ice, below freezing
else:
sat_vap_pres.append(0.61115 * np.exp((23.036 - (t / 333.7)) * (t / (279.82 + t))))
# convert rh from % to decimal
rel_hum = []
for r in np.nditer(rh):
rel_hum.append(r / 100.0)
# get vapor pressure from rh and saturation vapor pressure
vap_pres = []
for i in range(0, len(sat_vap_pres)):
es = rel_hum[i] * sat_vap_pres[i]
vap_pres.append(es)
# Get list of pressure values for mixing ratio calc
pressure = []
for p in np.nditer(pres):
pressure.append(p)
# Mixing ratio calc
mix_rat = []
for i in range(0, len(vap_pres)):
mix_rat.append(0.622 * vap_pres[i] / (pressure[i] - vap_pres[i]))
# Specific humidity
spec_hum = []
for rat in mix_rat:
spec_hum.append(rat / (1 + rat))
# Integrate specific humidity
pwv = 0.0
for i in range(1, len(pressure) - 1):
pwv = pwv + 0.5 * (spec_hum[i] + spec_hum[i - 1]) * (pressure[i - 1] - pressure[i])
pwv = pwv / 0.098
return pwv
[docs]def calculate_stability_indicies(
ds,
temp_name='temperature',
td_name='dewpoint_temperature',
p_name='pressure',
moving_ave_window=0,
):
"""
Function for calculating stability indices from sounding data.
Parameters
----------
ds : ACT dataset
The dataset to compute the stability indicies of. Must have
temperature, dewpoint, and pressure in vertical coordinates.
temp_name : str
The name of the temperature field.
td_name : str
The name of the dewpoint field.
p_name : str
The name of the pressure field.
moving_ave_window : int
Number of points to do a moving average on sounding data to reduce
noise. This is useful if noise in the sounding is preventing parcel
ascent.
Returns
-------
ds : ACT dataset
An ACT dataset with additional stability indicies added.
"""
t = ds[temp_name]
td = ds[td_name]
p = ds[p_name]
if not hasattr(t, 'units'):
raise AttributeError('Temperature field must have units' + ' for ACT to discern!')
if not hasattr(td, 'units'):
raise AttributeError('Dewpoint field must have units' + ' for ACT to discern!')
if not hasattr(p, 'units'):
raise AttributeError('Pressure field must have units' + ' for ACT to discern!')
if t.units == 'C':
t_units = units.degC
else:
t_units = getattr(units, t.units)
if td.units == 'C':
td_units = units.degC
else:
td_units = getattr(units, td.units)
p_units = getattr(units, p.units)
# Sort all values by decreasing pressure
t_sorted = np.array(t.values)
td_sorted = np.array(td.values)
p_sorted = np.array(p.values)
ind_sort = np.argsort(p_sorted)
t_sorted = t_sorted[ind_sort[-1:0:-1]]
td_sorted = td_sorted[ind_sort[-1:0:-1]]
p_sorted = p_sorted[ind_sort[-1:0:-1]]
if moving_ave_window > 0:
t_sorted = np.convolve(t_sorted, np.ones((moving_ave_window,)) / moving_ave_window)
td_sorted = np.convolve(td_sorted, np.ones((moving_ave_window,)) / moving_ave_window)
p_sorted = np.convolve(p_sorted, np.ones((moving_ave_window,)) / moving_ave_window)
t_sorted = t_sorted * t_units
td_sorted = td_sorted * td_units
p_sorted = p_sorted * p_units
t_profile = mpcalc.parcel_profile(p_sorted, t_sorted[0], td_sorted[0])
# Calculate parcel trajectory
ds['parcel_temperature'] = t_profile.magnitude
ds['parcel_temperature'].attrs['units'] = t_profile.units
# Calculate CAPE, CIN, LCL
sbcape, sbcin = mpcalc.surface_based_cape_cin(p_sorted, t_sorted, td_sorted)
lcl = mpcalc.lcl(p_sorted[0], t_sorted[0], td_sorted[0])
try:
lfc = mpcalc.lfc(p_sorted[0], t_sorted[0], td_sorted[0])
except IndexError:
lfc = np.nan * p_sorted.units
mucape, mucin = mpcalc.most_unstable_cape_cin(p_sorted, t_sorted, td_sorted)
where_500 = np.argmin(np.abs(p_sorted - 500 * units.hPa))
li = t_sorted[where_500] - t_profile[where_500]
ds['surface_based_cape'] = sbcape.magnitude
ds['surface_based_cape'].attrs['units'] = 'J/kg'
ds['surface_based_cape'].attrs['long_name'] = 'Surface-based CAPE'
ds['surface_based_cin'] = sbcin.magnitude
ds['surface_based_cin'].attrs['units'] = 'J/kg'
ds['surface_based_cin'].attrs['long_name'] = 'Surface-based CIN'
ds['most_unstable_cape'] = mucape.magnitude
ds['most_unstable_cape'].attrs['units'] = 'J/kg'
ds['most_unstable_cape'].attrs['long_name'] = 'Most unstable CAPE'
ds['most_unstable_cin'] = mucin.magnitude
ds['most_unstable_cin'].attrs['units'] = 'J/kg'
ds['most_unstable_cin'].attrs['long_name'] = 'Most unstable CIN'
ds['lifted_index'] = li.magnitude
ds['lifted_index'].attrs['units'] = t_profile.units
ds['lifted_index'].attrs['long_name'] = 'Lifted index'
ds['level_of_free_convection'] = lfc.magnitude
ds['level_of_free_convection'].attrs['units'] = lfc.units
ds['level_of_free_convection'].attrs['long_name'] = 'Level of free convection'
ds['lifted_condensation_level_temperature'] = lcl[1].magnitude
ds['lifted_condensation_level_temperature'].attrs['units'] = lcl[1].units
ds['lifted_condensation_level_temperature'].attrs[
'long_name'
] = 'Lifted condensation level temperature'
ds['lifted_condensation_level_pressure'] = lcl[0].magnitude
ds['lifted_condensation_level_pressure'].attrs['units'] = lcl[0].units
ds['lifted_condensation_level_pressure'].attrs[
'long_name'
] = 'Lifted condensation level pressure'
return ds
[docs]def calculate_pbl_liu_liang(
ds,
temperature='tdry',
pressure='pres',
windspeed='wspd',
height='alt',
smooth_height=3,
land_parameter=True,
llj_max_alt=1500.0,
llj_max_wspd=2.0,
):
"""
Function for calculating the PBL height from a radiosonde profile
using the Liu-Liang 2010 technique. There are some slight descrepencies
in the function from the ARM implementation 1.) it imposes a 1500m (keyword)
height on the definition of the LLJ and 2.) the interpolation is slightly different
using python functions
Parameters
----------
ds : xarray Dataset
Dataset housing radiosonde profile for calculations
temperature : str
The name of the temperature field.
pressure : str
The name of the pressure field.
windspeed : str
The name of the wind speed field.
height : str
The name of the height field
smooth_height : int
Number of points to do a moving average on sounding height data to reduce noise
land_parameter : boolean
Set to True if retrievals over land or false to retrievals over water
llj_max_alt : float
Maximum altitude the LLJ 2 m/s difference should be checked against
llj_max_wspd : float
Maximum wind speed threshold to use to define LLJ
Returns
-------
ds : xarray Dataset
xarray dataset with results stored in pblht_liu_liang variable
References
----------
Liu, Shuyan, and Xin-Zhong Liang. "Observed diurnal cycle climatology of planetary
boundary layer height." Journal of Climate 23, no. 21 (2010): 5790-5809.
Sivaraman, C., S. McFarlane, E. Chapman, M. Jensen, T. Toto, S. Liu, and M. Fischer.
"Planetary boundary layer (PBL) height value added product (VAP): Radiosonde retrievals."
Department of Energy Office of Science Atmospheric Radiation Measurement (ARM) Program
(United States) (2013).
"""
# Preprocess the sonde data to ensure the same methods across all retrievals
ds2 = preprocess_sonde_data(
ds,
temperature=temperature,
pressure=pressure,
height=height,
smooth_height=smooth_height,
base=5.0,
)
pres = ds2[pressure].values
wspd = ds2[windspeed].values
alt = ds2[height].values
theta = ds2['potential_temperature'].values
# Calculate the lapse rate
theta_gradient = np.diff(theta) / np.diff(alt)
# Calculate AGL
if np.isnan(alt[0]):
idx = np.where(~np.isnan(alt))[0]
agl = alt - alt[idx[0]]
else:
agl = alt - alt[0]
theta_diff = theta[4] - theta[1]
theta_gradient = np.diff(theta) / np.diff(alt / 1000.0)
# Set up threshold values
if land_parameter:
stability_thresh = 1.0 # K
inst_thresh = 0.5 # K
overshoot_thresh = 4.0 # K/km
else:
stability_thresh = 0.2 # K
inst_thresh = 0.1 # K
overshoot_thresh = 0.5 # K/km
# Check Regimes
if theta_diff < 0 - stability_thresh:
regime = 'CBL'
if theta_diff > abs(stability_thresh):
regime = 'SBL'
if (0 - stability_thresh) <= theta_diff <= abs(stability_thresh):
regime = 'NRL'
# Calculate for CBL/NRL regimes
pbl_stable = np.nan
pbl_shear = np.nan
if regime == 'CBL' or regime == 'NRL':
# Calculate gradient from first level
theta_gradient_0 = theta - theta[0]
# Only process data above 150m ARM
idx = np.where(agl > 150)[0][0]
theta_gradient_0[0:idx] = np.nan
# Scan upward to find lowest level that meets condition
idx = np.where(theta_gradient_0 >= inst_thresh)[0]
theta_gradient[0 : idx[0]] = np.nan
# Scan upward from previous level to search for overlying inversion layer
idx = np.where(theta_gradient >= overshoot_thresh)[0]
pbl = alt[idx[0]]
else:
idx = np.array(
[
i
for i, t in enumerate(theta_gradient[1:-1])
if theta_gradient[i] < theta_gradient[i - 1]
and theta_gradient[i] < theta_gradient[i + 1]
]
)
for i in idx:
cond1 = (theta_gradient[i] - theta_gradient[i - 1]) < -40.0
cond2 = (theta_gradient[i + 1] < overshoot_thresh) or (
theta_gradient[i + 2] < overshoot_thresh
)
if cond1 or cond2:
# This gets the ARM answer
pbl_stable = (alt[i + 1] + alt[i]) / 2.0
# pbl_stable = alt[i]
break
# Check for low-level jet
# Find the height of the maximum windspeed and look up to find layer 2m/s lower
# Stull 1988 indicates LLJ is defined as where there is a relative wind speed
# maximum that is more than 2 m/s faster than the wind speeds above it within
# the lowest 1500m of the atmosphere. Keywords to adjust are provided
idh = np.where(alt <= llj_max_alt)[0]
max_wspd_ind = [i for i, w in enumerate(wspd[:-1]) if wspd[i] > wspd[i + 1]][0]
diff = wspd[max_wspd_ind] - wspd[max_wspd_ind : idh[-1]]
idx = np.where(diff > llj_max_wspd)[0]
if len(idx) > 0:
wspd_to_surf = np.diff(np.flip(wspd[0:max_wspd_ind]))
wspd_monotonic = np.all(wspd_to_surf <= 0.0)
if wspd_monotonic:
pbl_shear = alt[max_wspd_ind]
if ~np.all(np.isnan([pbl_stable, pbl_shear])):
pbl = np.nanmin([pbl_stable, pbl_shear])
else:
pbl = -9999.0
atts = {'units': 'm', 'long_name': 'Planteary Boundary Layer Height Liu-Liang'}
da = xr.DataArray(pbl, attrs=atts)
ds['pblht_liu_liang'] = da
atts = {
'units': '',
'long_name': 'Planteary Boundary Layer Regime Classification Liu-Liang',
}
da = xr.DataArray(regime, attrs=atts)
ds['pblht_regime_liu_liang'] = da
atts = {'units': 'mb', 'long_name': 'Gridded pressure'}
da = xr.DataArray(pres, coords={'atm_pres_ss': pres}, dims=['atm_pres_ss'], attrs=atts)
ds['atm_pres_ss'] = da
atts = {'units': 'K', 'long_name': 'Gridded potential temperature'}
da = xr.DataArray(theta, coords={'atm_pres_ss': pres}, dims=['atm_pres_ss'], attrs=atts)
ds['potential_temperature_ss'] = da
atts = {'units': 'm', 'long_name': 'Gridded altitude'}
da = xr.DataArray(alt, coords={'atm_pres_ss': pres}, dims=['atm_pres_ss'], attrs=atts)
ds['alt_ss'] = da
atts = {'units': 'm', 'long_name': 'PBL Stable Condition 1'}
da = xr.DataArray(pbl_stable, attrs=atts)
ds['pblht_liu_liang_stable_cond'] = da
atts = {'units': 'm', 'long_name': 'PBL Shear Condition 2'}
da = xr.DataArray(pbl_shear, attrs=atts)
ds['pblht_liu_liang_shear_cond'] = da
return ds
[docs]def calculate_pbl_heffter(
ds,
temperature='tdry',
pressure='pres',
height='alt',
smooth_height=3,
base=5.0,
):
"""
Function for calculating the PBL height from a radiosonde profile
using the Heffter technique. There are differences from the ARM
VAP at times due to different averaging schemes. Larger differences
do occur at times and are unknown as to the cause but it is being
investigated and is potential a code issue with the VAP.
Parameters
----------
ds : xarray Dataset
Dataset housing radiosonde profile for calculations
temperature : str
The name of the temperature field.
pressure : str
The name of the pressure field.
height : str
The name of the height field
smooth_height : int
Number of points to do a moving average on sounding height data to reduce noise
base : int
Interval for pressure gridding. In testing, 5 mb was found to produce results with
the lowest RMS
Returns
-------
ds : xarray Dataset
xarray dataset with results stored in pblht_liu_liang variable
References
----------
Heffter JL. 1980. “Transport Layer Depth Calculations.” Second Joint Conference on
Applications of Air Pollution Meteorology, New Orleans, Louisiana.
Sivaraman, C., S. McFarlane, E. Chapman, M. Jensen, T. Toto, S. Liu, and M. Fischer.
"Planetary boundary layer (PBL) height value added product (VAP): Radiosonde retrievals."
Department of Energy Office of Science Atmospheric Radiation Measurement (ARM) Program
(United States) (2013).
"""
# Preprocess the sonde data to ensure the same methods across all retrievals
ds2 = preprocess_sonde_data(
ds,
temperature=temperature,
pressure=pressure,
height=height,
smooth_height=smooth_height,
base=base,
)
# Get data
pres = ds2[pressure].values
alt = ds2[height].values
theta = ds2['potential_temperature'].values
# Calculate the lapse rate
theta_gradient = np.diff(theta) / np.diff(alt)
# Calculate AGL
if np.isnan(alt[0]):
idx = np.where(~np.isnan(alt))[0]
agl = alt - alt[idx[0]]
else:
agl = alt - alt[0]
# Find where the lapse rate is greater than 0.005 K/m
idx = np.where(theta_gradient >= 0.005)[0]
# Find the consistent layers by grouping the indices together
# Does not include a single height as a layer
ranges = []
for key, group in groupby(enumerate(idx), lambda i: i[0] - i[1]):
group = list(map(itemgetter(1), group))
if group[-1] - group[0] > 0:
ranges.append((group[0], group[-1]))
# Subset ranges to lowest 5
if len(ranges) > 5:
ranges = ranges[0:5]
# For each layer, calculate the difference in theta from
# top and bottom of the layer. The lowest layer where the
# difference is > 2 K is set as the PBL.
pbl = 0.0
theta_diff_layer = []
bottom_inversion = []
top_inversion = []
for r in ranges:
if agl[r[1]] > 4000.0:
continue
theta_diff = theta[r[1]] - theta[r[0]]
theta_diff_layer.append(theta_diff)
bottom_inversion.append(alt[r[0]])
top_inversion.append(alt[r[1]])
if pbl == 0.0 and theta_diff > 2.0:
pbl = alt[r[0]]
if len(theta_diff_layer) == 0:
pbl = -9999.0
# If PBL is not set, set it to the layer with the max theta diff
if pbl == 0.0:
idx = np.argmax(theta_diff_layer)
pbl = bottom_inversion[idx]
# Add variables to the dataset
atts = {'units': 'm', 'long_name': 'Planteary Boundary Layer Height Heffter'}
da = xr.DataArray(pbl, attrs=atts)
ds['pblht_heffter'] = da
atts = {'units': 'mb', 'long_name': 'Gridded pressure'}
da = xr.DataArray(pres, coords={'atm_pres_ss': pres}, dims=['atm_pres_ss'], attrs=atts)
ds['atm_pres_ss'] = da
atts = {'units': 'K', 'long_name': 'Gridded potential temperature'}
da = xr.DataArray(theta, coords={'atm_pres_ss': pres}, dims=['atm_pres_ss'], attrs=atts)
ds['potential_temperature_ss'] = da
atts = {'units': 'm', 'long_name': 'Gridded altitude'}
da = xr.DataArray(alt, coords={'atm_pres_ss': pres}, dims=['atm_pres_ss'], attrs=atts)
ds['alt_ss'] = da
atts = {'units': 'm', 'long_name': 'Bottom height of inversion layers'}
da = xr.DataArray(
bottom_inversion,
coords={'layers': list(range(len(bottom_inversion)))},
dims=['layers'],
attrs=atts,
)
ds['bottom_inversion'] = da
atts = {'units': 'm', 'long_name': 'Top height of inversion layers'}
da = xr.DataArray(
top_inversion,
coords={'layers': list(range(len(top_inversion)))},
dims=['layers'],
attrs=atts,
)
ds['top_inversion'] = da
return ds
def preprocess_sonde_data(
ds,
temperature='tdry',
pressure='pres',
height='alt',
smooth_height=3,
base=5.0,
):
"""
Function for processing the SONDE data for the PBL calculations.
This is to ensure consistency and also applies some QC to the
processing.
Parameters
----------
ds : xarray Dataset
Dataset housing radiosonde profile for calculations
temperature : str
The name of the temperature field.
pressure : str
The name of the pressure field.
height : str
The name of the height field
smooth_height : int
Number of points to do a moving average on sounding height data to reduce noise
base : int
Interval for pressure gridding. In testing, 5 mb was found to produce results with
the lowest RMS
Returns
-------
ds : xarray.Dataset
Xarray dataset containing processed sonde data.
References
----------
Sivaraman, C., S. McFarlane, E. Chapman, M. Jensen, T. Toto, S. Liu, and M. Fischer.
"Planetary boundary layer (PBL) height value added product (VAP): Radiosonde retrievals."
Department of Energy Office of Science Atmospheric Radiation Measurement (ARM) Program
(United States) (2013).
"""
# Get the initial time and temp values
time_0 = ds['time'].values
temp_0 = ds[temperature].values
# Apply a rolling average to smooth the pressure out
ds[pressure] = ds[pressure].rolling(time=smooth_height, min_periods=1, center=True).mean()
# Swap time and pressure for doing the appropriate gridding
ds2 = ds.swap_dims(dims_dict={'time': pressure})
for var in ds2:
ds2[var].attrs = ds2[var].attrs
# Set up the pressure grid
starting_pres = base * np.ceil(float(ds2[pressure].values[2]) / base)
p_grid = np.flip(np.arange(100.0, starting_pres + base, base))
# Pull out the data that's nearest to the pressure grid. If it errors
# it will smooth the data more. This tends to happen if there are multiple
# values of pressure that are the same
try:
ds2 = ds2.sel({pressure: p_grid}, method='nearest')
except Exception:
ds[pressure] = (
ds[pressure].rolling(time=smooth_height + 4, min_periods=2, center=True).mean()
)
ds2 = ds.swap_dims(dims_dict={'time': pressure})
for var in ds2:
ds2[var].attrs = ds[var].attrs
try:
ds2 = ds2.sel({pressure: p_grid}, method='nearest')
except Exception:
raise ValueError('Sonde profile does not have unique pressures after smoothing')
# Get data
alt = ds2[height].values
pres = ds2[pressure].values
temp = ds2[temperature].values
# Perform Pre-processing checks
if len(temp) == 0.0:
raise ValueError('No data in profile')
if np.nanmax(alt) < 1000.0:
raise ValueError('Max altitude < 1000m')
if np.nanmax(pres) <= 200.0:
raise ValueError('Max pressure <= 200 hPa')
# Check temperature delta
t1 = time_0[0]
t2 = t1 + np.timedelta64(10, 's')
idx = np.where((time_0 >= t1) & (time_0 <= t2))[0]
t_delta = abs(temp_0[idx[-1]] - temp_0[idx[0]])
if t_delta > 30.0:
raise ValueError('Temperature changes by >30º in first 10 seconds')
# Check min/max
if np.nanmax(temp) > 50.0 or np.nanmin(temp) < -90:
raise ValueError('Temperature outside acceptable range (-90, 50)')
if np.isnan(pres[0]) or np.isnan(pres[1]):
raise ValueError('First two pressure values bad')
# Calculate potential temperature and subsequent gradients
theta = (
convert_to_potential_temp(ds=ds2, temp_var_name=temperature, press_var_name=pressure)
+ 273.15
)
# Set variables to return
atts = {'units': 'K', 'long_name': 'Potential temperature'}
da = xr.DataArray(theta, coords=ds2['tdry'].coords, dims=ds2[temperature].dims, attrs=atts)
ds2['potential_temperature'] = da
return ds2