"""
Functions for doppler lidar specific retrievals
"""
import warnings
import dask
import numpy as np
import xarray as xr
[docs]def compute_winds_from_ppi(
ds,
elevation_name='elevation',
azimuth_name='azimuth',
radial_velocity_name='radial_velocity',
snr_name='signal_to_noise_ratio',
intensity_name=None,
snr_threshold=0.008,
remove_all_missing=False,
condition_limit=1.0e4,
return_ds=None,
):
"""
This function will convert a Doppler Lidar PPI scan into vertical
distribution of horizontal wind direction and speed.
Code was adapted by Kenneth E Kehoe from code developed by Rob K
Newsom. Please see the reference noted below and cite accordingly.
Parameters
----------
ds : xarray.Dataset
The xarray dataset containing PPI scan to be converte into winds.
elevation_name : str
The name of the elevation variable in the dataset.
azimuth_name : str
The name of the azimuth variable in the dataset.
radial_velocity_name : str
The name of the radial velocity variable in the dataset.
snr_name : str
The name of the signal to noise variable in the dataset.
intensity_name : str
The name of the intensity variable in the dataset. If this
is set will use intensity instead of signal to noise ratio.
variable.
snr_threshold : float
The signal to noise lower threshold used to decide which values to use.
remove_all_missing : boolean
Option to not add a time step in the returned dataset where all values
are set to NaN
condition_limit : float
Upper limit used with Normalized data to check if data should be
converted from scan signal to noise ration to wind speeds and
directions.
return_ds : None or xarray.Dataset
If set to a Xarray Dataset the calculated winds dataset will
be concatinated onto this dataset. This is to allow looping over
this function for many scans and returning a single dataset.
Returns
-------
ds : xarray.Dataset or None
The winds converted from PPI scan to horizontal wind speeds and wind
directions along with wind speed error and wind direction error. If
there is a problem determining the breaks between PPI scans, will
return None.
References
----------
Rob K Newsom, Alan W Brewer, James M Wilczak, Daniel E Wolfe,
Steven P Oncley and Julie K Lundquist ; Validating Precision Estimates in
Horizontal Wind Measurements from a Doppler Lidar, Atmospheric Measurement
Techniques Discussions 2016, 10, 1-30
"""
new_ds = None
azimuth = ds[azimuth_name].values
azimuth_rounded = np.round(azimuth).astype(int)
# Determine where the azimuth scans repeate to get range for each PPI
index = np.where(azimuth_rounded == azimuth_rounded[0])[0]
if index.size == 0:
print(
'\nERROR: Having trouble determining the PPI scan breaks '
'in compute_winds_from_ppi().\n'
)
return return_ds
if index.size == 1:
num_scans = azimuth.size
else:
num_scans = index[1] - index[0]
elevation = np.radians(ds[elevation_name].values)
azimuth = np.radians(ds[azimuth_name].values)
doppler = ds[radial_velocity_name].values
if intensity_name is not None:
intensity = ds[intensity_name].values
snr = intensity - 1
del intensity
var_name = intensity_name
else:
try:
snr = ds[snr_name].values
var_name = snr_name
except KeyError:
intensity = ds['intensity'].values
snr = intensity - 1
del intensity
var_name = 'intensity'
height_name = list(set(ds[var_name].dims) - {'time'})[0]
rng = ds[height_name].values
try:
height_units = ds[height_name].attrs['units']
except KeyError:
if rng[0] > 0:
height_units = 'm'
else:
height_units = 'km'
time = ds['time'].values
# Loop over each PPI scan
task = []
for start_index in index:
scan_index = range(start_index, start_index + num_scans)
# Since this can run while instrument is making measurements
# the number of PPI scans may not match exactly. This will
# adjust the number of scans in case there is an issue.
if scan_index[-1] > np.size(elevation):
scan_index = range(start_index, np.size(elevation))
task.append(
dask.delayed(process_ppi_winds)(
time[scan_index],
elevation[scan_index],
azimuth[scan_index],
snr[scan_index, :],
doppler[scan_index, :],
rng,
condition_limit,
snr_threshold,
remove_all_missing,
height_units,
)
)
results = dask.compute(*task)
is_Dataset = [isinstance(ii, xr.core.dataset.Dataset) for ii in results]
if any(is_Dataset):
results = [results[ii] for ii, value in enumerate(is_Dataset) if value is True]
new_ds = xr.concat(results, 'time')
if isinstance(return_ds, xr.core.dataset.Dataset) and isinstance(
new_ds, xr.core.dataset.Dataset
):
return_ds = xr.concat([return_ds, new_ds], dim='time')
else:
return_ds = new_ds
return return_ds
def process_ppi_winds(
time,
elevation,
azimuth,
snr,
doppler,
rng,
condition_limit,
snr_threshold,
remove_all_missing,
height_units,
):
"""
This function is for processing the winds using dask from the compute_winds_from_ppi
function. This should not be used standalone.
"""
height = rng * np.median(np.sin(elevation))
xhat = np.sin(azimuth) * np.cos(elevation)
yhat = np.cos(azimuth) * np.cos(elevation)
zhat = np.sin(elevation)
dims = np.shape(snr)
# mean_snr = np.nanmean(snr, axis=1)
u_wind = np.full(dims[1], np.nan)
v_wind = np.full(dims[1], np.nan)
w_wind = np.full(dims[1], np.nan)
u_err = np.full(dims[1], np.nan)
v_err = np.full(dims[1], np.nan)
w_err = np.full(dims[1], np.nan)
residual = np.full(dims[1], np.nan)
chisq = np.full(dims[1], np.nan)
corr = np.full(dims[1], np.nan)
# Loop over each level
for ii in range(dims[1]):
ur1 = doppler[:, ii]
snr1 = snr[:, ii]
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RuntimeWarning)
index = np.where((snr1 >= snr_threshold) & np.isfinite(ur1))[0]
count = index.size
if count >= 4:
ur1 = ur1[index]
xhat1 = xhat[index]
yhat1 = yhat[index]
zhat1 = zhat[index]
a = np.full((3, 3), np.nan)
b = np.full(3, np.nan)
a[0, 0] = np.sum(xhat1**2)
a[1, 0] = np.sum(xhat1 * yhat1)
a[2, 0] = np.sum(xhat1 * zhat1)
a[0, 1] = a[1, 0]
a[1, 1] = np.sum(yhat1**2)
a[2, 1] = np.sum(yhat1 * zhat1)
a[0, 2] = a[2, 0]
a[1, 2] = a[2, 1]
a[2, 2] = np.sum(zhat1**2)
b[0] = np.sum(ur1 * xhat1)
b[1] = np.sum(ur1 * yhat1)
b[2] = np.sum(ur1 * zhat1)
ainv = np.linalg.inv(a)
condition = np.linalg.norm(a) * np.linalg.norm(ainv) # Condition Number ?
if condition < condition_limit:
c = b @ ainv
u_wind[ii] = c[0]
v_wind[ii] = c[1]
w_wind[ii] = c[2]
ur_fit = xhat1 * u_wind[ii] + yhat1 * v_wind[ii] + zhat1 * w_wind[ii]
chisq[ii] = np.sum((ur_fit - ur1) ** 2)
residual[ii] = np.sqrt(chisq[ii] / count)
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RuntimeWarning)
corr[ii] = np.corrcoef(ur_fit, ur1)[0, 1]
u_err[ii] = np.sqrt((chisq[ii] / (count - 3)) * ainv[0, 0])
v_err[ii] = np.sqrt((chisq[ii] / (count - 3)) * ainv[1, 1])
w_err[ii] = np.sqrt((chisq[ii] / (count - 3)) * ainv[2, 2])
# Compute windspeed and direction
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RuntimeWarning)
wspd = np.sqrt(u_wind**2 + v_wind**2)
wdir = np.degrees(np.arctan2(u_wind, v_wind) + np.pi)
wspd_err = np.sqrt((u_wind * u_err) ** 2 + (v_wind * v_err) ** 2) / wspd
wdir_err = np.degrees(np.sqrt((u_wind * v_err) ** 2 + (v_wind * u_err) ** 2) / wspd**2)
if remove_all_missing and np.isnan(wspd).all():
return np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan
time = time[0] + (time[-1] - time[0]) / 2
time = time.reshape(
1,
)
wspd = wspd.reshape(1, rng.size)
wdir = wdir.reshape(1, rng.size)
wspd_err = wspd_err.reshape(1, rng.size)
wdir_err = wdir_err.reshape(1, rng.size)
corr = corr.reshape(1, rng.size)
residual = residual.reshape(1, rng.size)
with warnings.catch_warnings():
warnings.filterwarnings('ignore', category=RuntimeWarning)
snr_mean = np.nanmean(snr, axis=0)
snr_mean = snr_mean.reshape(1, rng.size)
new_ds = xr.Dataset(
{
'wind_speed': (
('time', 'height'),
wspd,
{'long_name': 'Wind speed', 'units': 'm/s'},
),
'wind_direction': (
('time', 'height'),
wdir,
{'long_name': 'Wind direction', 'units': 'degree'},
),
'wind_speed_error': (
('time', 'height'),
wspd_err,
{'long_name': 'Wind direction error', 'units': 'm/s'},
),
'wind_direction_error': (
('time', 'height'),
wdir_err,
{'long_name': 'Wind direction error', 'units': 'degree'},
),
'signal_to_noise_ratio': (
('time', 'height'),
snr_mean,
{
'long_name': 'Signal to noise ratio mean over PPI scan',
'units': '1',
},
),
'residual': (
('time', 'height'),
residual,
{
'long_name': 'Residual values (Square Root of Chi Square)',
'units': 'm/s',
},
),
'correlation': (
('time', 'height'),
corr,
{'long_name': 'Correlation coefficient', 'units': '1'},
),
},
{
'time': ('time', time, {'long_name': 'Time in UTC'}),
'height': (
'height',
height,
{'long_name': 'Height to center of bin', 'units': height_units},
),
},
)
return new_ds