"""
===============
emc2.core.Model
===============
This module contains the Model class and example Models for your use.
"""
import warnings
import xarray as xr
import numpy as np
from netCDF4 import Dataset
from scipy.special import gamma
from scipy.interpolate import RegularGridInterpolator
try:
from act.io.arm import read_arm_netcdf as read_netcdf
except:
print('Using act-atmos v1.5.3 or earlier. Please update to v2.0.0 or newer')
from act.io.armfiles import read_netcdf
from .instrument import ureg, quantity
from ..scattering import brandes
try:
from wrf import tk, getvar, ALL_TIMES
WRF_PYTHON_AVAILABLE = True
except ImportError:
WRF_PYTHON_AVAILABLE = False
[docs]
class Model():
"""
This class stores the model specific parameters for the radar simulator.
Attributes
----------
Rho_hyd: dict
A dictionary whose keys are the names of the model's hydrometeor classes and
whose values are the density of said hydrometeors in :math:`kg\ m^{-3}`
fluffy: dict or None
A dictionary whose keys are the names of the model's ice hydrometeor classes and
whose values are the ice fluffiness factor for the fwd calculations using r_e,
where values of 0 - equal volume sphere, 1 - fluffy sphere i.e., diameter = maximum dimension.
If None, then not applying this correction factor
lidar_ratio: dict
A dictionary whose keys are the names of the model's hydrometeor classes and
whose values are the lidar_ratio of said hydrometeors.
vel_param_a: dict
A dictionary whose keys are the names of the model's hydrometeor classes and
whose values are the :math:`a` parameters to the equation :math:`V = aD^b` used to
calculate terminal velocity corresponding to each hydrometeor.
vel_param_b: dict
A dictionary whose keys are the names of the model's hydrometeor classes and
whose values are the :math:`b` parameters to the equation :math:`V = aD^b` used to
calculate terminal velocity corresponding to each hydrometeor.
N_field: dict
A dictionary whose keys are the names of the model's hydrometeor classes and
whose values are the number concentrations in :math:`cm^{-3}` corresponding to
each hydrometeor class.
T_field: str
A string containing the name of the temperature field in the model.
q_field: str
A string containing the name of the water vapor mixing ratio field (in kg/kg) in the model.
p_field: str
A string containing the name of the pressure field (in mbar) in the model.
z_field: str
A string containing the name of the height field (in m) in the model.
conv_frac_names: dict
A dictionary containing the names of the convective fraction corresponding to each
hydrometeor class in the model.
strat_frac_names: dict
A dictionary containing the names of the stratiform fraction corresponding to each
hydrometeor class in the model.
conv_frac_names_for_rad: dict
A dictionary containing the names of the convective fraction corresponding to each
hydrometeor class in the model for the radiation scheme.
strat_frac_names_for_rad: dict
A dictionary containing the names of the stratiform fraction corresponding to each
hydrometeor class in the model for the radiation scheme.
conv_re_fields: dict
A dictionary containing the names of the effective radii of each convective
hydrometeor class
strat_re_fields: dict
A dictionary containing the names of the effective radii of each stratiform
hydrometeor class
strat_relvar_name: str or None
name of the (strat) relative variance field in the model output, which is used for cloud water
SGS variability. A higher relative variance translates to a smaller inverse relative variance,
which serves as the shape parameter of the SGS q_c; hence, higher relative variance --> higher
SGS q_c variance.
Using default value (1) if None.
asp_ratio_func: dict
A dictionary that returns hydrometeor aspect ratios as a function of maximum dimension in mm.
Vd_rhoa_scaling_ref: dict
dict keys are hydrometeor classes with dict values being the reference air density [kg m-3] used
for the scaling of reflectivity-weighted fall velocity (Mean Doppler velocity minus air motion)
by the ratio (rhoa_0 / rhoa) ** 0.54, where rho_a_0 is the reference air density and rhoa is
the air density. This correction, implemented in many microphysics schemes
(e.g., MG2, P3), followes Heymsfield et al. (JAS, 2007).
Set key values to None (or exclude hydrometeor class from keys) to skip this correction.
hyd_types: list
list of hydrometeor classes to include in calcuations. By default set to be consistent
with the model represented by the Model subclass.
time_dim: str
The name of the time dimension in the model.
height_dim: str
The name of the height dimension in the model.
lat_dim: str
Name of the latitude dimension in the model (relevant for regional output)
lon_dim: str
Name of the longitude dimension in the model (relevant for regional output)
mcphys_scheme: str
Name of the microphysics scheme used by the model. Current options are:
1. "MG2", "MG", "Morrison" - essentially treated the same.
2. "NSSL"
3. "P3"
rad_scheme_family: str
Radiation scheme family. Many models share the same radiation scheme or bulk scattering LUT
ancestor (inc. same PSD parameters, ice habit description, etc.). This parameters determines
the single particle or bulk LUT to use given the Model class.
stacked_time_dim: str or None
This attribute becomes a string of the original time dimension name only if
stacking was required to enable EMC2 to processes a domain output (time x lat x lon).
process_conv: bool
If True, then processing convective model output (can typically be set to False for
some models).
model_name: str
The name of the model.
variable_density: dict
If the model allows for particle density for vary (e.g. 2-moment NSSL), then
this is a dict pointing to the variable with the density for each hydrometeor class
"""
[docs]
def __init__(self):
self.Rho_hyd = {}
self.fluffy = {}
self.lidar_ratio = {}
self.LDR_per_hyd = {}
self.vel_param_a = {}
self.vel_param_b = {}
self.q_names_convective = {}
self.q_names_stratiform = {}
self.N_field = {}
self.T_field = ""
self.q_field = ""
self.p_field = ""
self.z_field = ""
self.qp_field = {}
self.conv_frac_names = {}
self.strat_frac_names = {}
self.conv_frac_names_for_rad = {}
self.strat_frac_names_for_rad = {}
self.variable_density = {}
self.conv_re_fields = {}
self.strat_re_fields = {}
self.strat_relvar_name = None
self.mu_field = None
self.lambda_field = None
self.hyd_types = []
self.ds = None
self.time_dim = "time"
self.height_dim = "height"
self.lat_dim = "lat"
self.lon_dim = "lon"
self.mcphys_scheme = None
self.rad_scheme_family = None
self.stacked_time_dim = None
self.process_conv = True
self.model_name = ""
self.consts = {"c": 299792458.0, # m/s
"R_d": 287.058, # J K^-1 Kg^-1
"g": 9.80665, # m/s^2
"Avogadro_c": 6.022140857e23,
"R": 8.3144598} # J K^-1 mol^-1
self.asp_ratio_func = {}
self.Vd_rhoa_scaling_ref = {}
self.ice_hyd_types = ["ci", "pi"]
def _add_vel_units(self):
for my_keys in self.vel_param_a.keys():
self.vel_param_a[my_keys] = self.vel_param_a[my_keys] * (
ureg.meter ** (1 - self.vel_param_b[my_keys].magnitude) / ureg.second)
def _prepare_variables(self):
for variable in self.ds.variables.keys():
attrs = self.ds[variable].attrs
try:
self.ds[variable] = self.ds[variable].astype('float64')
except TypeError:
continue
self.ds[variable].attrs = attrs
def _calc_air_density(self):
"""
Calculates the air density and stores it in the dataset under the key "rho_a".
The calculation is based on the ideal gas law using temperature [K] and pressure [hPa] fields.
"""
if np.all([self.T_field in self.ds.keys(), self.T_field in self.ds.keys()]):
self.ds["rho_a"] = self.ds[self.p_field] * 1e2 / (self.consts["R_d"] * self.ds[self.T_field])
self.ds["rho_a"].attrs["units"] = "kg / m ** 3"
else:
warnings.warn(f"Temperature and pressure field not provided in `Model` init; "
f"this could introduce errors if the microphysics approach is applied", RuntimeWarning)
def _crop_bounding_box(self, bounding_box):
"""
Crop the input region to a given bounding box for a regional model.
Parameters
----------
bounding_box: 4-tuple
A tuple in the format of (lat min, lon min, lat max, lon max).
"""
bounding_box_ind = np.zeros(4, dtype='int64')
XLAT_min = self.ds[self.lat_name].min(axis=2).mean(axis=0)
XLAT_max = self.ds[self.lat_name].max(axis=2).mean(axis=0)
XLONG_min = self.ds[self.lon_name].min(axis=1).mean(axis=0)
XLONG_max = self.ds[self.lon_name].max(axis=1).mean(axis=0)
bounding_box_ind[0] = np.argmin(
np.abs(bounding_box[0] - np.squeeze(XLAT_min.values)))
bounding_box_ind[2] = np.argmin(
np.abs(bounding_box[2] - np.squeeze(XLAT_max.values)))
bounding_box_ind[1] = np.argmin(
np.abs(bounding_box[1] - np.squeeze(XLONG_min.values)))
bounding_box_ind[3] = np.argmin(
np.abs(bounding_box[3] - np.squeeze(XLONG_max.values)))
input_dict = {}
input_dict[self.lat_dim] = slice(bounding_box_ind[0], bounding_box_ind[2])
input_dict[self.lon_dim] = slice(bounding_box_ind[1], bounding_box_ind[3])
self.ds = self.ds.isel(input_dict)
def _crop_bounding_box_x_y(self, bounding_box):
"""
Crop the input region to a given bounding box for a regional model with
x and y coordinates.
Parameters
----------
bounding_box: 4-tuple
A tuple in the format of (x min, y min, x max, y max)
"""
self.ds = self.ds.sel(
x=slice(bounding_box[0], bounding_box[2]), y=slice(bounding_box[1], bounding_box[3]))
def _crop_time_range(self, time_range, alter_coord=None):
"""
Crop model output time range (time coords must be of np.datetime64 datatype).
Can significantly cut subcolumn processing time.
Parameters
----------
time_range: tuple, list, or array, typically in datetime64 format
Two-element array with starting and ending of time range.
alter_coord: str or None
Alternative time coords to use for cropping.
"""
if alter_coord is None:
t_coords = self.time_dim
else:
t_coords = alter_coord
time_ind = np.logical_and(self.ds[t_coords] >= time_range[0],
self.ds[t_coords] < time_range[1])
if np.sum(time_ind) == 0:
self.ds.close()
print("The requested time range: {0} to {1} is out of the "
"model output range; Ignoring crop request.".format(time_range[0], time_range[1]))
else:
self.ds = self.ds.isel({self.time_dim: time_ind})
@property
def hydrometeor_classes(self):
"""
The list of hydrometeor classes.
"""
return self.hyd_types
@property
def num_hydrometeor_classes(self):
"""
The number of hydrometeor classes used
"""
return len(self.hyd_types)
@property
def num_subcolumns(self):
"""
Gets the number of subcolumns in the model. Will
return 0 if the number of subcolumns has not yet been set.
"""
if 'subcolumn' in self.ds.dims:
return self.ds.dims['subcolumn']
else:
return 0
@num_subcolumns.setter
def num_subcolumns(self, a):
"""
This will set the number of subcolumns in the simulated radar output.
This is a handy shortcut for setting the number of subcolumns if you
do not want to use any of the functions in the simulator module to
do so.
"""
subcolumn = xr.DataArray(np.arange(a), dims='subcolumn')
self.ds['subcolumn'] = subcolumn
[docs]
def remove_subcol_fields(self, cloud_class="conv"):
"""
Remove all subcolumn output fields for the given cloud class to save memory (mainly releveant
for CESM and E3SM).
"""
vars_to_drop = []
for key in self.ds.keys():
if np.logical_and(cloud_class in key,
np.any([fstr in key for fstr in ["sub_col", "subcol", "phase_mask"]])):
vars_to_drop.append(key)
self.ds = self.ds.drop_vars(vars_to_drop)
[docs]
def finalize_subcol_fields(self, more_fieldnames=[]):
"""
Remove all zero values from subcolumn output fields enabling better visualization. Can be applied
over additional fields using the more_fieldnames input parameter.
"""
for key in self.ds.keys():
if np.any([fstr in key for fstr in ["sub_col"] + more_fieldnames]):
self.ds[key].values = np.where(self.ds[key].values == 0., np.nan, self.ds[key].values)
[docs]
def remove_appended_str(self, all_appended_in_lat=False):
"""
Remove appended strings from xr.Dataset coords and fieldnames based on lat/lon coord names
(typically required when using post-processed output data files).
Note: in the case of E3SM-like machinery output, the method can handle output from multiple
sub-domains assuming there is only a single spatial dimension (typically `ncol`).
Parameters
----------
all_appended_in_lat: bool
If True using only the appended str portion to the lat_dim. Otherwise, combining
the appended str from both the lat and lon dims (relevant if appended_str is True).
"""
if all_appended_in_lat:
coordinates = [self.lat_dim]
out_coords = [x for x in self.ds.keys()] # assuming lat is not in coords since using ncol
else:
coordinates = [self.lat_dim, self.lon_dim]
out_coords = [x for x in self.ds.dims]
coord_locs = np.argwhere([
(x[: len(coordinates[0])] == coordinates[0]) & (len(x) > len(coordinates[0]))
for x in self.ds.dims]).flatten()
if all_appended_in_lat: # more computationally expensive treatment of 1 or multiple sub-domains
appended_dims = [var[len(self.lat_dim):] for var in self.ds.dims
if var[: len(self.lat_dim) + 1] == f"{self.lat_dim}_"] # matching E3SM machinery
appended_size = np.sum([self.ds[f"{self.lat_dim}{x}"].size for x in appended_dims])
if appended_size == 0:
print("no dimension name with an appended string found")
return None
self.ds = self.ds.assign_coords({self.lat_dim: (self.lat_dim, np.arange(appended_size))})
appended_var_list = []
for out_coord in out_coords: # loop over variables
for appended_dim in appended_dims:
if appended_dim in out_coord:
var = out_coord[: out_coord.find(appended_dim)]
if var not in appended_var_list:
appended_var_list.append(var)
for var in appended_var_list:
tmp_arr = None
for appended_dim in appended_dims:
if tmp_arr is None:
tmp_arr = self.ds[var + appended_dim].values
var_dims = list(self.ds[var + appended_dim].dims)
var_dims[-1] = self.lat_dim
var_attrs = self.ds[var + appended_dim].attrs
else:
tmp_arr = np.concatenate((tmp_arr, self.ds[var + appended_dim].values), axis=-1)
self.ds[var] = xr.DataArray(tmp_arr, dims=tuple(var_dims), attrs=var_attrs)
self.ds = self.ds.drop_vars([f"{var}{appended_dim}" for appended_dim in appended_dims])
else:
for coordinate in coordinates:
coord_loc = np.argwhere([x[:len(coordinate)] == coordinate for x in out_coords]).item()
if len(out_coords[coord_loc]) > len(coordinate):
appended_str = out_coords[coord_loc][len(coordinate):]
appended_coords = np.char.find(out_coords, appended_str)
to_rename = {}
for ind in range(len(out_coords)):
if appended_coords[ind] > -1:
to_rename[out_coords[ind]] = out_coords[ind][:appended_coords[ind]]
out_fields = [x for x in self.ds.data_vars]
appended_fields = np.char.find(out_fields, appended_str)
for ind in range(len(out_fields)):
if appended_fields[ind] > -1:
to_rename[out_fields[ind]] = out_fields[ind][:appended_fields[ind]]
self.ds = self.ds.rename(to_rename)
[docs]
def check_and_stack_time_lat_lon(self, out_coord_name="time_lat_lon", file_path=None, order_dim=True):
"""
Stack the time dim together with the lat and lon dims (if the lat and/or lon dims are longer than 1)
to enable EMC^2 processing of regional model output. Otherwise, squeezing the lat and lon dims (if they
exist in dataset). Finally, the method reorder dimensions to time x height for proper processing by
calling the "permute_dims_for_processing" class method.
NOTE: tmp variables for lat, lon, and time are produced as xr.Datasets still have many unresolved bugs
associated with pandas multi-indexing implemented in xarray for stacking (e.g., new GitHub issue #5692).
Practically, after the subcolumn processing the stacking information is lost so an alternative dedicated
method is used for unstacking
Parameters
----------
out_coord_name: str
Name of output stacked coordinate.
file_path: str
Path and filename of model simulation output.
order_dim: bool
When True, reorder dimensions to time x height for proper processing.
"""
do_process = 0 # 0 - do nothing, 1 - stack lat+lon, 2 - stack lat dim only
# Add time dimension for processing (remove later if length = 1)
if not self.time_dim in [x for x in self.ds.dims]:
self.ds = self.ds.expand_dims(self.time_dim)
# Check to make sure we are loading a single column
if self.lat_dim in [x for x in self.ds.dims]:
if self.ds.sizes[self.lat_dim] != 1:
do_process = 1
if self.lon_dim in [x for x in self.ds.dims]:
if self.ds.sizes[self.lon_dim] != 1:
do_process = 1
elif do_process == 1:
do_process = 2
if do_process > 0:
if file_path is None:
file_path = "The input filename"
print("%s is a regional output dataset; Stacking the time, lat, "
"and lon dims for processing with EMC^2." % file_path)
self.ds[self.lat_dim + "_tmp"] = (
xr.DataArray(self.ds[self.lat_dim].values,
coords={self.lat_dim + "_tmp": self.ds[self.lat_dim].values}))
if do_process == 1:
self.ds[self.lon_dim + "_tmp"] = (
xr.DataArray(self.ds[self.lon_dim].values,
coords={self.lon_dim + "_tmp": self.ds[self.lon_dim].values}))
self.ds[self.time_dim + "_tmp"] = (
xr.DataArray(self.ds[self.time_dim].values,
coords={self.time_dim + "_tmp": self.ds[self.time_dim].values}))
if do_process == 1:
self.ds = self.ds.stack({out_coord_name: (self.lat_dim, self.lon_dim, self.time_dim)})
else:
self.ds = self.ds.stack({out_coord_name: (self.lat_dim, self.time_dim)})
self.stacked_time_dim, self.time_dim = self.time_dim, out_coord_name
else:
if self.lon_dim in [x for x in self.ds.dims]:
# No need for lat and lon dimensions
self.ds = self.ds.squeeze(dim=(self.lat_dim, self.lon_dim))
else:
self.ds = self.ds.squeeze(dim=(self.lat_dim))
if order_dim:
self.permute_dims_for_processing() # Consistent dim order (time x height).
[docs]
def unstack_time_lat_lon(self, order_dim=True, squeeze_single_dims=True):
"""
Unstack the time, lat, and lon dims if they were previously stacked together
(self.stacked_time_dim is not None). Finally, the method reorder dimensions to time x height for proper
processing by calling the "permute_dims_for_processing" class method.
Note (relevant for squeeze_single_dims == True):
If the time dimension size is 1, that dimension is squeezed. Similarly, if the number of
subcolumns is 1 (subcolumn generator is turned off), the subcolumn dimension is squeezed as well.
NOTE: This is a dedicated method written because xr.Datasets still have many unresolved bugs
associated with pandas multi-indexing implemented in xarray for stacking (e.g., new GitHub issue #5692).
Practically, after the subcolumn processing the stacking information is lost so this is an alternative
dedicated method.
Parameters
----------
order_dim: bool
When True, reorder dimensions to subcolumn x time x height for proper processing.
squeeze_single_dims: bool
If True, squeezing the time and/or subcolumn dimension(s) if their lengh is 1.
"""
if self.stacked_time_dim is None:
raise TypeError("stacked_time_dim is None so dataset is apparently already unstacked!")
out_fields = [x for x in self.ds.keys()]
self.permute_dims_for_processing(base_order=[self.height_dim, self.time_dim], base_dim_first=False)
if self.lon_dim + "_tmp" in self.ds.coords:
more_dims = (self.ds[self.lon_dim + "_tmp"].dims[0],
self.ds[self.stacked_time_dim + "_tmp"].dims[0])
more_shapes = (self.ds[self.lon_dim + "_tmp"].size,
self.ds[self.stacked_time_dim + "_tmp"].size)
else:
more_dims = (self.ds[self.stacked_time_dim + "_tmp"].dims[0],)
more_shapes = (self.ds[self.stacked_time_dim + "_tmp"].size,)
for key in out_fields:
Attrs = self.ds[key].attrs
Dims = self.ds[key].dims
Shape = self.ds[key].shape
if len(Shape) == 0:
continue
if self.time_dim in Dims:
self.ds[key] = xr.DataArray(np.reshape(self.ds[key].values,
(*Shape[:-1], self.ds[self.lat_dim + "_tmp"].size,
*more_shapes)),
dims=(*Dims[:-1], self.ds[self.lat_dim + "_tmp"].dims[0],
*more_dims),
attrs=Attrs)
self.ds = self.ds.drop_dims(self.time_dim)
self.time_dim, self.stacked_time_dim = self.stacked_time_dim, None
self.ds = self.ds.rename({self.lat_dim + "_tmp": self.lat_dim,
self.time_dim + "_tmp": self.time_dim})
if self.lon_dim + "_tmp" in self.ds.coords:
self.ds = self.ds.rename({self.lon_dim + "_tmp": self.lon_dim})
if order_dim:
self.permute_dims_for_processing() # Consistent dim order (subcolumn x time x height).
if squeeze_single_dims:
if self.ds[self.time_dim].size == 1:
self.ds = self.ds.squeeze(self.time_dim)
if self.num_subcolumns == 1:
self.ds = self.ds.squeeze("subcolumn")
[docs]
def permute_dims_for_processing(self, base_order=None, base_dim_first=True):
"""
Reorder dims for consistent processing such that the order is:
subcolumn x time x height.
Note: lat/lon dims are assumed to already be stacked with the time dim.
Parameters
----------
base_order: list or None
List of preffered dimension order. Use default if None
base_dim_first: bool
Make the base dims (height and time) the first ones in the permutation if True.
"""
if base_order is None:
base_order = [self.time_dim, self.height_dim]
Dims_new_order = [x for x in self.ds.dims
if x not in ["subcolumn"] + base_order]
if "subcolumn" in self.ds.dims:
base_order = ["subcolumn"] + base_order
if base_dim_first:
Dims_new_order = tuple(base_order + Dims_new_order)
else:
Dims_new_order = tuple(Dims_new_order + base_order)
self.ds = self.ds.transpose(*Dims_new_order)
def set_hyd_types(self, hyd_types):
if hyd_types is None:
if self.num_hydrometeor_classes == 0:
raise ValueError("The '%s' Model subclass has 0 specified hydrometeor classes and "
"no other 'hyd_types' variable was specified. Please check the "
"Model class attributes setup." % self.model_name)
return self.hyd_types
else:
return hyd_types
[docs]
def subcolumns_to_netcdf(self, file_name):
"""
Saves all of the simulated subcolumn parameters to a netCDF file.
Parameters
----------
file_name: str
The name of the file to save to.
"""
# Set all relevant variables to save:
vars_to_keep = ["sub_col", "subcol", "strat_", "conv_", "_tot", "_ext", "_mask", "_min", "mpr", "fpr", self.time_dim, "Times"]
var_dict = {}
for my_var in self.ds.variables.keys():
if np.any([x in my_var for x in vars_to_keep]):
var_dict[my_var] = self.ds[my_var]
out_ds = xr.Dataset(var_dict)
out_ds.to_netcdf(file_name)
[docs]
def load_subcolumns_from_netcdf(self, file_name):
"""
Load all of the subcolumn data from a previously saved netCDF file.
The dataset being loaded must match the current number of subcolumns if there are any
generated.
Parameters
----------
file_name: str
Name of the file to save.
"""
my_file = xr.open_dataset(file_name)
self.ds = xr.merge([self.ds, my_file])
my_file.close()
[docs]
@staticmethod
def set_array_to_valid_range(arr_in, grid_in):
"""
Adjusts the values in the input array to ensure they fall within the valid range
defined by the minimum and maximum values of the grid array. Values below the
grid's minimum are set to the minimum, and values above the grid's maximum are
set to the maximum.
In practice, this method can be used to set "nearest" extrapolation (considering model
output with slight truncated values, e.g., rho_r of 900.00006 kg m-3 in some E3SMv3 output).
Parameters
==========
arr_in: numpy.ndarray
The input array whose values need to be adjusted.
grid_in: numpy.ndarray
The grid array used to determine the valid range (minimum and maximum values).
Returns
=======
numpy.ndarray
A new array with values adjusted to fall within the valid range of the grid.
"""
grid_min, grid_max = np.nanmin(grid_in), np.nanmax(grid_in)
arr_in = np.where(arr_in < grid_min, grid_min,
np.where(arr_in > grid_max, grid_max,
arr_in))
return arr_in
[docs]
class ModelE(Model):
[docs]
def __init__(self, file_path, time_range=None, load_processed=False):
"""
This loads a ModelE simulation with all of the necessary parameters for EMC^2 to run.
Parameters
----------
file_path: str
Path to a ModelE simulation.
time_range: tuple, list, or array, typically in datetime64 format
Two-element array with starting and ending of time range.
load_processed: bool
If True, treating the 'file_path' variable as an EMC2-processed dataset; thus skipping
dimension stacking as part of pre-processing.
"""
super().__init__()
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m**3), 'ci': 500. * ureg.kg / (ureg.m**3),
'pl': 1000. * ureg.kg / (ureg.m**3), 'pi': 250. * ureg.kg / (ureg.m**3)}
self.fluffy = {'ci': 0.5 * ureg.dimensionless, 'pi': 0.5 * ureg.dimensionless}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m**3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m**3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m**3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m**3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
super()._add_vel_units()
self.q_field = "q"
self.N_field = {'cl': 'ncl', 'ci': 'nci', 'pl': 'npl', 'pi': 'npi'}
self.p_field = "p_3d"
self.z_field = "z"
self.T_field = "t"
self.height_dim = "p"
self.time_dim = "time"
self.conv_frac_names = {'cl': 'cldmccl', 'ci': 'cldmcci', 'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names = {'cl': 'cldsscl', 'ci': 'cldssci', 'pl': 'cldsspl', 'pi': 'cldsspi'}
self.conv_frac_names_for_rad = {'cl': 'cldmcr', 'ci': 'cldmcr',
'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names_for_rad = {'cl': 'cldssr', 'ci': 'cldssr',
'pl': 'cldssr', 'pi': 'cldssr'}
self.conv_re_fields = {'cl': 're_mccl', 'ci': 're_mcci', 'pi': 're_mcpi', 'pl': 're_mcpl'}
self.strat_re_fields = {'cl': 're_sscl', 'ci': 're_ssci', 'pi': 're_sspi', 'pl': 're_sspl'}
self.q_names_convective = {'cl': 'QCLmc', 'ci': 'QCImc', 'pl': 'QPLmc', 'pi': 'QPImc'}
self.q_names_stratiform = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.Vd_rhoa_scaling_ref = {"cl": 1.0, # Ikawa and Saito (1990, p. 85)
"ci": 1.0, # Ikawa and Saito (1990, p. 85)
"pl": 1.1, # Liu and Orville (1969, p. 1269),
"pi": 1.15} # Localleti and Hobbs (1974) - est. from 750-1500 alt + T < 273.15
self.hyd_types = ["cl", "ci", "pl", "pi"]
self.mcphys_scheme = "MG2" # per GM2015 (J. Clim.)
self.rad_scheme_family = "ModelE"
if load_processed:
self.ds = xr.Dataset()
self.load_subcolumns_from_netcdf(file_path)
else:
self.ds = read_netcdf(file_path)
if np.logical_and("level" in self.ds.coords, not "p" in self.ds.coords):
self.height_dim = "level"
# crop specific model output time range (if requested)
if time_range is not None:
if np.issubdtype(time_range.dtype, np.datetime64):
super()._crop_time_range(time_range)
else:
raise RuntimeError("input time range is not in the required datetime64 data type")
if not load_processed:
# stack dimensions in the case of a regional output or squeeze lat/lon dims if exist and len==1
super().check_and_stack_time_lat_lon(file_path=file_path)
# ModelE has pressure units in mb, but pint only supports hPa
self.ds["p_3d"].attrs["units"] = "hPa"
self._calc_air_density() # calculate air density with input T and p fields
self.model_name = "ModelE3"
[docs]
class E3SMv1(Model):
[docs]
def __init__(self, file_path, time_range=None, load_processed=False, time_dim="time", appended_str=False,
all_appended_in_lat=False, single_ice_class=False, include_rain_in_rt=False,
mcphys_scheme="MG2"):
"""
This loads an E3SMv1 simulation output with all of the necessary parameters for EMC^2 to run.
Parameters
----------
file_path: str
Path to an E3SMv1 simulation.
time_range: tuple, list, or array, typically in datetime64 format
Two-element array with starting and ending of time range.
load_processed: bool
If True, treating the 'file_path' variable as an EMC2-processed dataset; thus skipping
appended string removal and dimension stacking, which are typically part of pre-processing.
time_dim: str
Name of the time dimension. Typically "time" or "ncol".
appended_str: bool
If True, removing appended strings added to fieldnames and coordinates during
post-processing (e.g., in cropped regions from global simualtions).
all_appended_in_lat: bool
If True using only the appended str portion to the lat_dim. Otherwise, combining
the appended str from both the lat and lon dims (relevant if appended_str is True).
single_ice_class: bool
If True, assuming model microphysics incorporate a single ice class (e.g., in P3 implemented
in E3SMv3).
include_rain_in_rt: bool
If True, including the rain class (`pl`) in the forward calculations.
By default, set to False given that the rain class is excluded from the E3SM radiative scheme
calculations.
mcphys_scheme: str
Name of the microphysics scheme used by the model. Current options are:
"""
super().__init__()
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m**3), 'ci': 500. * ureg.kg / (ureg.m**3),
'pl': 1000. * ureg.kg / (ureg.m**3), 'pi': 250. * ureg.kg / (ureg.m**3)}
self.fluffy = {'ci': 1.0 * ureg.dimensionless, 'pi': 1.0 * ureg.dimensionless}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m**3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m**3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m**3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m**3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
super()._add_vel_units()
self.q_field = "Q"
self.N_field = {'cl': 'NUMLIQ', 'ci': 'NUMICE', 'pl': 'NUMRAI', 'pi': 'NUMSNO'}
self.p_field = "p_3d"
self.z_field = "Z3"
self.T_field = "T"
self.height_dim = "lev"
self.time_dim = time_dim
self.conv_frac_names = {'cl': 'zeros_cf', 'ci': 'zeros_cf', 'pl': 'zeros_cf', 'pi': 'zeros_cf'}
self.strat_frac_names = {'cl': 'FREQL', 'ci': 'FREQI', 'pl': 'FREQR', 'pi': 'FREQS'}
self.conv_frac_names_for_rad = {'cl': 'zeros_cf', 'ci': 'zeros_cf',
'pl': 'zeros_cf', 'pi': 'zeros_cf'}
self.strat_frac_names_for_rad = {'cl': 'CLOUD', 'ci': 'CLOUD',
'pl': 'FREQR', 'pi': 'FREQS'}
self.conv_re_fields = {'cl': 'zeros_cf', 'ci': 'zeros_cf', 'pi': 'zeros_cf', 'pl': 'zeros_cf'}
self.strat_re_fields = {'cl': 'AREL', 'ci': 'AREI', 'pi': 'ADSNOW', 'pl': 'ADRAIN'}
self.strat_relvar_name = "RELVAR"
self.q_names_convective = {'cl': 'zeros_cf', 'ci': 'zeros_cf', 'pl': 'zeros_cf', 'pi': 'zeros_cf'}
self.q_names_stratiform = {'cl': 'CLDLIQ', 'ci': 'CLDICE', 'pl': 'RAINQM', 'pi': 'SNOWQM'}
self.mu_field = {'cl': 'mu_cloud', 'ci': None, 'pl': None, 'pi': None}
self.lambda_field = {'cl': 'lambda_cloud', 'ci': None, 'pl': None, 'pi': None}
self.Vd_rhoa_scaling_ref = {"cl": 1.0, # Ikawa and Saito (1990, p. 85)
"ci": 1.0, # Ikawa and Saito (1990, p. 85)
"pl": 1.1, # Liu and Orville (1969, p. 1269),
"pi": 1.15} # Localleti and Hobbs (1974) - est. from 750-1500 alt + T < 273.15
if include_rain_in_rt:
self.hyd_types = ["cl", "ci", "pl"]
else:
self.hyd_types = ["cl", "ci"]
if single_ice_class:
for attr in ['N_field', 'strat_frac_names', 'strat_frac_names_for_rad', 'strat_re_fields',
'q_names_stratiform']:
eval(f"self.{attr}")['pi'] = 'zeros_cf'
else:
self.hyd_types += ["pi"]
self.mcphys_scheme = mcphys_scheme
self.rad_scheme_family = "CESM" # E3SMv1 uses the same bulk LUTs as CESM (CAM 5.0)
self.process_conv = False
# Set essential P3 attributes (if relevant)
if (self.mcphys_scheme == "P3"):
print(f"Using P3 microphysics option; verify that model output includes the following fields required "
f"for EMC² to operate properly:\n1. Rime mass (CLDRIM)\n2. Rime volume (BVRIM)\n")
ice_valid_mass_thresh = 1e-18 # samples < are considered truncated samples (supported by data analysis)
self.mu_field["ci"] = "mu_ice"
self.lambda_field["ci"] = "lambda_ice"
self.vel_param_a["ci"], self.vel_param_a["pi"] = None, None # setting as None to prevent user confusion
self.vel_param_b["ci"], self.vel_param_b["pi"] = None, None # setting as None to prevent user confusion
self.p3_kws = {
"q_rim_name": "CLDRIM", # Rime mass (mixing ratio) [kg kg-1]
"rim_vol_name": "BVRIM", # riming volume [m3 kg-1]
"in_cld_Ni_name": "in_cld_Ni", # in-cloud ice number number concentration [m-3]
"N0_ice_name": "N0_norm_ice", # Normalized ice PSD intercept parameter
}
if load_processed:
self.ds = xr.Dataset()
self.load_subcolumns_from_netcdf(file_path)
else:
self.ds = read_netcdf(file_path)
if appended_str:
if all_appended_in_lat:
self.lat_dim = "ncol" # here 'ncol' is the spatial dim (acknowledging cube-sphere coords)
super().remove_appended_str(all_appended_in_lat)
if time_dim == "ncol":
time_datetime64 = np.array([x.strftime('%Y-%m-%dT%H:%M') for x in self.ds["time"].values],
dtype='datetime64')
self.ds = self.ds.assign_coords(time=('ncol', time_datetime64)) # add additional time coords
# crop specific model output time range (if requested)
if time_range is not None:
if np.issubdtype(time_range.dtype, np.datetime64):
if time_dim == "ncol":
super()._crop_time_range(time_range, alter_coord="time")
else:
super()._crop_time_range(time_range)
else:
raise RuntimeError("input time range is not in the required datetime64 data type")
# Flip height coordinates in data arrays (to descending pressure levels / ascending height)
self.ds = self.ds.assign_coords({self.height_dim: np.flip(self.ds[self.height_dim].values)})
for key in self.ds.keys():
if self.height_dim in self.ds[key].dims:
rel_dim = np.argwhere([self.height_dim == x for x in self.ds[key].dims]).item()
self.ds[key].values = np.flip(self.ds[key].values, axis=rel_dim)
# stack dimensions in the case of a regional output or squeeze lat/lon dims if exist and len==1
super().check_and_stack_time_lat_lon(file_path=file_path, order_dim=False)
self.ds[self.p_field] = (
((self.ds["P0"] * self.ds["hyam"] + self.ds["PS"] * self.ds["hybm"]).T / 1e2).transpose( # hPa
*self.ds[self.T_field].dims))
self.ds[self.p_field].attrs["units"] = "hPa"
self.ds["zeros_cf"] = xr.DataArray(np.zeros_like(self.ds[self.p_field].values),
dims=self.ds[self.p_field].dims)
self.ds["zeros_cf"].attrs["long_name"] = "An array of zeros as only strat output is used for this model"
self._calc_air_density() # calculate air density with input T and p fields
precip_classes = [x for x in self.hyd_types if x[0] == "p"]
for hyd in self.hyd_types:
# convert mass number to number concentration [cm-3]
self.ds[self.N_field[hyd]].values *= self.ds["rho_a"].values / 1e6
if hyd in precip_classes:
self.ds[self.strat_re_fields[hyd]].values *= 0.5 * 1e6 # Assuming r_eff in m was provided
self.ds[self.strat_re_fields[hyd]].values = (
np.where(self.ds[self.strat_re_fields[hyd]].values == 0.,
np.nan, self.ds[self.strat_re_fields[hyd]].values))
# use the number concentration to retrieve in-cloud; calculate other essential P3 fields
if (self.mcphys_scheme == "P3") & (hyd == "ci"): # P3 preprocessing
for var in [self.q_names_stratiform[hyd], self.p3_kws["q_rim_name"]]:
self.ds[var].where(
lambda x: x > ice_valid_mass_thresh) # remove truncated water content samples to prevent issues
dims = self.ds[self.N_field[hyd]].dims
self.ds[self.p3_kws["in_cld_Ni_name"]] = xr.DataArray(
self.ds[self.N_field[hyd]].values / self.ds[self.strat_frac_names[hyd]].values * 1e6, dims=dims,
attrs={"units": "m-3", "long_name": "in-cloud ice number concentration"})
self.ds["Fr"] = xr.DataArray(
self.ds[self.p3_kws["q_rim_name"]].values / self.ds[self.q_names_stratiform[hyd]].values,
dims=dims, attrs={"units": "1", "long_name": "Rime fraction"})
rho_r = np.where(
self.ds["Fr"].values == 0, 0.0, # filled 0 vals (set to 50 below) do not impact if Fr = 0
self.ds[self.p3_kws["q_rim_name"]].values / self.ds[self.p3_kws["rim_vol_name"]].values)
self.ds["rho_r"] = xr.DataArray(
rho_r, dims=dims, attrs={"units": "kg/m^3", "long_name": "Rime density"})
self.ds["qi_norm"] = xr.DataArray(
(self.ds[self.q_names_stratiform[hyd]].values /
self.ds[self.strat_frac_names[hyd]].values) / self.ds[self.p3_kws["in_cld_Ni_name"]].values,
dims=dims, attrs={"units": "m^3", "long_name": "Ni-normalized in-cloud ice water content"})
self.permute_dims_for_processing() # Consistent dim order (time x height).
self.model_name = "E3SMv1"
[docs]
class E3SMv3(E3SMv1):
[docs]
def __init__(self, file_path, time_range=None, load_processed=False, time_dim="time", appended_str=False,
all_appended_in_lat=False, single_ice_class=True, include_rain_in_rt=False,
mcphys_scheme="P3", instrument=None, use_hybrid_scat=False):
"""
This loads an E3SMv3 simulation output with all of the necessary parameters for EMC^2 to run.
Note that a `p3_kws` attribute is added to this `Model` sub-class to host the p3-specific namelist.
Parameters
----------
file_path: str
Path to an E3SMv3 simulation.
time_range: tuple, list, or array, typically in datetime64 format
Two-element array with starting and ending of time range.
load_processed: bool
If True, treating the 'file_path' variable as an EMC2-processed dataset; thus skipping
appended string removal and dimension stacking, which are typically part of pre-processing.
time_dim: str
Name of the time dimension. Typically "time" or "ncol".
appended_str: bool
If True, removing appended strings added to fieldnames and coordinates during
post-processing (e.g., in cropped regions from global simualtions).
all_appended_in_lat: bool
If True using only the appended str portion to the lat_dim. Otherwise, combining
the appended str from both the lat and lon dims (relevant if appended_str is True).
single_ice_class: bool
If True, assuming model microphysics incorporate a single ice class (e.g., in P3 implemented
in E3SMv3).
include_rain_in_rt: bool
If True, including the rain class (`pl`) in the forward calculations.
By default, set to False given that the rain class is excluded from the E3SM radiative scheme
calculations.
mcphys_scheme: str
Name of the microphysics scheme used by the model. Current options are:
instrument: :func:`emc2.core.Instrument` class
An `Instrument` object, the LUTs of which are used here to calculate ice PSD parameters, to
save computation time in subsequent processing.
use_hybrid_scat: bool or int
If True or > 0 taking a hybrid approach, i.e., using beta_p and alpha_p of (per calculation approach):
(1) microphysics: equivalent volume spheres (with D still being the max dimensions per P3 approach).
(2) radiation: (== 1) pure radiation approach, i.e., derive A_hyd from r_eff and subcolumn q_i
(== 2) equivalent volume spheres (as in the microphysics approach) or (Mie) sphere
assumptions while still being faithful to the PSD information from model output
Note that we do not use here equivalent volume to surface area approach per D. Mitchell because the
diameters and hence surface areas are non-monotonic, which introduces some weird behavior.
The effect of this approach is to reduce potentially overestimated lidar variables (think about the
implications for E3SM optics...) and radar variables, though to a lesser extent.
Note #2: This boolean does not affect the terminal velocity calculations, which are based on the m-D,A-D
information. This, together with the non-equivalent V/A renders this approach "hybrid" and increases
the gap between microphysics and radiation.
If False or == 0 (default), using full P3 approach, i.e., consistent treatment of ice microphysics and
radiation, to a large extent at the very least.
"""
super().__init__(file_path, time_range, load_processed, time_dim, appended_str, all_appended_in_lat,
single_ice_class, include_rain_in_rt, mcphys_scheme)
self.model_name = "E3SMv3"
# if Instrument object was input, we use the automatically-loaded scattering LUT to process PSD params
# (instrument type (HSRL, etc.) has no effect on PSD params).
if mcphys_scheme == "P3":
self.rad_scheme_family = "P3" # E3SMv3 uses CESM bulk LUTs but with a P3 twist (crp and Fr dependence)
self.Vd_rhoa_scaling_ref = {"cl": None, # N/A (see Morrison and Milbrandt (2015, p. 294)
"ci": 0.8254, # p=600 hPa, T=253.15 K (used in P3 LUT generator code)
"pl": 1.1} # Liu and Orville (1969, p. 1269)
# if Instrument object was input, we use the automatically-loaded scattering LUT to process PSD params
# (instrument type (HSRL, etc.) has no effect on PSD params).
if (instrument is not None):
Fr_in, rho_r_in, qi_norm_in = self.limit_input_data_to_p3_lut_range(instrument.scat_table['p3_ice'])
self.ds["Fr"].values = Fr_in
self.ds["rho_r"].values = rho_r_in
self.ds["qi_norm"].values = qi_norm_in
self.interp_p3_ice_psd_params_to_grid(
instrument.scat_table['p3_ice'], limit_input_data_to_lut_rng=True)
self.interpobj = {"single": {}, "bulk": {}}
# set interpolator objects for essential bulk and single-particle quantities
for key in ["A_tot_norm", "A_tot_eq_V_norm", "A_tot_Mie_norm",
"vt_m_weight", "Z", "ri_eff", "Dm_m_weight", "rho_m_weight"]:
self.interpobj["bulk"][key] = RegularGridInterpolator(
(instrument.scat_table['p3_ice']["Fr"].values,
instrument.scat_table['p3_ice']["rho_r"].values,
instrument.scat_table['p3_ice']["q_norm"].values),
instrument.scat_table['p3_ice'][key].values, bounds_error=False)
d_dependent_list = ["vt", "A", "V"]
if use_hybrid_scat:
d_dependent_list += ["D_eq_vol_sphere"] # eq. vol sphere D is rime-dependent
for key in d_dependent_list:
var_in = instrument.scat_table['p3_ice'][key].values
self.interpobj["single"][key] = RegularGridInterpolator(
(instrument.scat_table['p3_ice']["Fr"].values,
instrument.scat_table['p3_ice']["rho_r"].values,
instrument.scat_table['p3_ice']["p_diam"].values),
var_in, bounds_error=False)
self.p3_kws["use_hybrid_scat"] = use_hybrid_scat
@staticmethod
def calc_mu_n0_from_lambda(lambda_in):
"""
Calculate the Gamma PSD shape parameter (mu) and normalized n0 from the given lambda
following E3SMv3 processing (see:
/E3SM/components/eam/tools/create_p3_lookupTable/create_p3_lookupTable_1.f90-v4.1).
Parameters
==========
lambda_in: float
Input slope parameter
Returns
=======
mu_i: float
Final Gamma PSD shape parameter per Fig. 3B in Heymsfield (2003, Part II)
n0: float
Normalized N0.
"""
mu_i = 0.076 * (lambda_in / 100.) ** 0.8 - 2 # Calculate shape parameter (convert m-1 to cm-1)
mu_i = np.maximum(mu_i, 0.)
mu_i = np.minimum(mu_i, 6.) # final Gamma PSD shape parameter
n0 = lambda_in ** (mu_i + 1.) / (gamma(mu_i + 1.)) # Normalized n0
return mu_i, n0
def limit_input_data_to_p3_lut_range(self, scat_file):
"""
Adjusts the input data to ensure it falls within the valid range defined by the provided (P3)
lookup table (LUT).
Parameters
==========
scat_file: xarray.Dataset
Loaded P3 LUT file.
Returns
=======
Fr_in: numpy.ndarray
Adjusted values of "Fr" within the valid range.
rho_r_in: numpy.ndarray
Adjusted values of "rho_r" within the valid range.
qi_norm_in: numpy.ndarray
Adjusted values of "qi_norm" within the valid range.
"""
Fr_in = self.set_array_to_valid_range(self.ds["Fr"].values, scat_file["Fr"].values)
rho_r_in = self.set_array_to_valid_range(self.ds["rho_r"].values, scat_file["rho_r"].values)
qi_norm_in = self.set_array_to_valid_range(self.ds["qi_norm"].values, scat_file["q_norm"].values)
return Fr_in, rho_r_in, qi_norm_in
def interp_p3_ice_psd_params_to_grid(self, scat_file, limit_input_data_to_lut_rng=True):
"""
Sets the ice particle size distribution (PSD) parameters via interpolation
using scattering file data and optional input data range limitation.
Parameters
==========
scat_file: xarray.Dataset
Loaded P3 LUT file.
limit_input_data_to_lut_rng: bool, optional
A flag to determine whether to limit the input data to the lookup table
range. Defaults to True.
"""
x = scat_file["Fr"].values
y = scat_file["rho_r"].values
z = scat_file["q_norm"].values
if limit_input_data_to_lut_rng:
Fr_in, rho_r_in, qi_norm_in = self.limit_input_data_to_p3_lut_range(scat_file)
else:
Fr_in, rho_r_in, qi_norm_in = self.ds["Fr"].values, self.ds["rho_r"].values, self.ds["qi_norm"].values
interp = RegularGridInterpolator((x, y, z), scat_file["lambda"].values, bounds_error=False)
self.ds["lambda_ice"] = xr.DataArray(interp((Fr_in, rho_r_in, qi_norm_in)), dims=self.ds["Fr"].dims)
mu_i, n0 = self.calc_mu_n0_from_lambda(self.ds["lambda_ice"].values)
self.ds["mu_ice"] = xr.DataArray(mu_i, dims=self.ds["Fr"].dims)
self.ds["N0_norm_ice"] = xr.DataArray(n0, dims=self.ds["Fr"].dims) # normalized N0
[docs]
class CESM2(E3SMv1):
[docs]
def __init__(self, file_path, time_range=None, load_processed=False, time_dim="time", appended_str=False):
"""
This loads a CESM2 simulation output with all of the necessary parameters for EMC^2 to run.
Parameters
----------
file_path: str
Path to an E3SMv1 simulation.
time_range: tuple, list, or array, typically in datetime64 format
Two-element array with starting and ending of time range.
load_processed: bool
If True, treating the 'file_path' variable as an EMC2-processed dataset; thus skipping
appended string removal and dimension stacking, which are typically part of pre-processing.
time_dim: str
Name of the time dimension. Typically "time" or "ncol".
appended_str: bool
If True, removing appended strings added to fieldnames and coordinates during
post-processing (e.g., in cropped regions from global simualtions).
"""
super().__init__(file_path, time_range, load_processed, time_dim, appended_str)
self.model_name = "CESM2"
[docs]
class WRF(Model):
[docs]
def __init__(self, file_path, z_range=None, time_range=None,
mcphys_scheme="nssl", NUWRF=False, include_gr_class=False,
gr_class_is_hail=True,
bounding_box=None, q_hyd_truncation_cutoff=1e-14):
"""
This load a WRF simulation and all of the necessary parameters from
the simulation.
Parameters
----------
file_path: str
Path to WRF simulation.
time_range: tuple or None
Start and end time to include. If this is None, the entire
simulation will be included.
z_range: numpy array or None
The z levels of the vertical grid you want to use. By default,
the levels are 0 m to 15000 m, increasing by 500 m.
mcphys_scheme: str
3ice: Goddard 3ICE scheme
morrison: Morrison microphysics
NUWRF: bool
If true, model is NASA Unified WRF.
include_gr_class: bool
(NUWRF is False) If True, include the `gr` (graupel) class in processing
gr_class_is_hail: bool
(NUWRF is False and include_gr_class is True) If True, the `gr` class is
treated as hail (coefficients and density are treated accordingly).
bounding_box: None or 4-tuple
If not none, then a tuple representing the bounding box
(lat_min, lon_min, lat_max, lon_max).
q_hyd_truncation_cutoff: float
hydrometeor mixing ratio cutoff value [kg kg-1].
grid cells with values smaller than the cutoff will be set as clear.
Default (1e-14) is per the implementation of the MG scheme in WRF.
"""
if not WRF_PYTHON_AVAILABLE:
raise ModuleNotFoundError("wrf-python must be installed.")
super().__init__()
if mcphys_scheme.lower() in ["mg2", "mg", "morrison"]:
self.hyd_types = ["cl", "ci", "pl", "pi"]
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m**3),
'ci': 500. * ureg.kg / (ureg.m**3),
'pl': 1000. * ureg.kg / (ureg.m**3),
'pi': 250. * ureg.kg / (ureg.m**3)}
self.fluffy = {'ci': 1.0 * ureg.dimensionless,
'pi': 1.0 * ureg.dimensionless}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m**3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m**3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m**3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m**3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
self.q_names = {'cl': 'QCLOUD', 'ci': 'QICE',
'pl': 'QRAIN', 'pi': 'QSNOW'}
self.q_field = "QVAPOR"
self.N_field = {'cl': 'QNCLOUD', 'ci': 'QNICE',
'pl': 'QNRAIN', 'pi': 'QNSNOW'}
self.p_field = "pressure"
self.z_field = "Z"
self.T_field = "T"
self.conv_frac_names = {'cl': 'conv_frac', 'ci': 'conv_frac',
'pl': 'conv_frac', 'pi': 'conv_frac'}
self.strat_frac_names = {'cl': 'strat_cl_frac', 'ci': 'strat_ci_frac',
'pl': 'strat_pl_frac', 'pi': 'strat_pi_frac'}
self.conv_frac_names_for_rad = self.conv_frac_names.copy()
self.strat_frac_names_for_rad = self.strat_frac_names.copy()
self.q_names_convective = {'cl': 'qclc', 'ci': 'qcic',
'pl': 'qplc', 'pi': 'qpic'}
self.q_names_stratiform = {'cl': 'qcls', 'ci': 'qcis',
'pl': 'qpls', 'pi': 'qpis'}
self.conv_re_fields = {'cl': 're_clc', 'ci': 're_cic',
'pl': 're_plc', 'pi': 're_pic'}
self.strat_re_fields = {'cl': 're_cls', 'ci': 're_cis',
'pl': 're_pls', 'pi': 're_pis'}
self.Vd_rhoa_scaling_ref = {"cl": 1.0, # Ikawa and Saito (1990, p. 85)
"ci": 1.0, # Ikawa and Saito (1990, p. 85)
"pl": 1.1, # Liu and Orville (1969, p. 1269),
"pi": 1.15} # Localleti and Hobbs (1974) - est. based on paper info
if include_gr_class:
self.hyd_types += ['gr']
if gr_class_is_hail:
self.Rho_hyd.update({'gr': 900.0 * ureg.kg / (ureg.m**3)})
self.vel_param_a.update({'gr': 114.5}) # MATSUN AND HUGGINS 1980
self.vel_param_b.update({'gr': 0.5 * ureg.dimensionless})
self.Vd_rhoa_scaling_ref["gr"] = 1.23 # Matsun and Huggins (1980, p. 1116)
else:
self.Rho_hyd.update({'gr': 400.0 * ureg.kg / (ureg.m**3)})
self.vel_param_a.update({'gr': 19.3})
self.vel_param_b.update({'gr': 0.37 * ureg.dimensionless})
self.fluffy.update({'gr': 1.0 * ureg.dimensionless})
self.lidar_ratio.update({'gr': 24.0 * ureg.dimensionless})
self.LDR_per_hyd.update({'gr': 0.40 * 1 / (ureg.kg / (ureg.m**3))})
self.q_names.update({'gr': 'QGRAUP'})
self.N_field.update({'gr': 'QNGRAUPEL'})
self.conv_frac_names.update({'gr': 'conv_frac'})
self.strat_frac_names.update({'gr': 'strat_gr_frac'})
self.conv_frac_names_for_rad.update({'gr': self.conv_frac_names['gr']})
self.strat_frac_names_for_rad.update({'gr': self.strat_frac_names['gr']})
self.q_names_convective.update({'gr': 'qgrc'})
self.q_names_stratiform.update({'gr': 'qgrs'})
self.conv_re_fields.update({'gr': 're_grc'})
self.strat_re_fields.update({'gr': 're_grs'})
self.ice_hyd_types += ["gr"]
super()._add_vel_units()
elif mcphys_scheme.lower() == "nssl":
self.hyd_types = ["cl", "ci", "pl", "sn", "gr", "ha"]
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m**3),
'ci': 459. * ureg.kg / (ureg.m**3),
'pl': 1000. * ureg.kg / (ureg.m**3),
'sn': 100. * ureg.kg / (ureg.m**3),
'gr': 'variable',
'ha': 'variable'}
self.fluffy = {'ci': 0.5 * ureg.dimensionless,
'sn': 0.5 * ureg.dimensionless,
'gr': 0.5 * ureg.dimensionless,
'ha': 0.5 * ureg.dimensionless }
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'sn': 24.0 * ureg.dimensionless,
'gr': 24.0 * ureg.dimensionless,
'ha': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m**3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m**3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m**3)),
'sn': 0.40 * 1 / (ureg.kg / (ureg.m**3)),
'gr': 0.40 * 1 / (ureg.kg / (ureg.m**3)),
'ha': 0.40 * 1 / (ureg.kg / (ureg.m**3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700.,
'pl': 841.997, 'sn': 12.42, 'gr': 330,
'ha': 330}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'sn': 0.8 * ureg.dimensionless,
'gr': 0.8 * ureg.dimensionless,
'ha': 0.8 * ureg.dimensionless}
super()._add_vel_units()
self.q_names = {'cl': 'QCLOUD', 'ci': 'QICE',
'pl': 'QRAIN', 'sn': 'QSNOW',
'gr': 'QGRAUP', 'ha': 'QHAIL'}
self.q_field = "QVAPOR"
self.N_field = {'cl': 'QNDROP', 'ci': 'QNICE',
'pl': 'QNRAIN', 'sn': 'QNSNOW',
'gr': 'QNGRAUPEL', 'ha': 'QNHAIL'}
self.p_field = "pressure"
self.z_field = "Z"
self.T_field = "T"
self.conv_frac_names = {'cl': 'conv_frac', 'ci': 'conv_frac',
'pl': 'conv_frac', 'sn': 'conv_frac',
'gr': 'conv_frac', 'ha': 'conv_frac'}
self.strat_frac_names = {'cl': 'strat_cl_frac', 'ci': 'strat_ci_frac',
'pl': 'strat_pl_frac', 'sn': 'strat_sn_frac',
'gr': 'strat_gr_frac', 'ha': 'strat_ha_frac'}
self.conv_frac_names_for_rad = {'cl': 'conv_frac', 'ci': 'conv_frac',
'pl': 'conv_frac', 'sn': 'conv_frac',
'gr': 'conv_frac', 'ha': 'conv_frac'}
self.strat_frac_names_for_rad = {'cl': 'strat_cl_frac', 'ci': 'strat_ci_frac',
'pl': 'strat_pl_frac', 'sn': 'strat_sn_frac',
'gr': 'strat_gr_frac', 'ha': 'strat_ha_frac'}
self.conv_re_fields = {'cl': 'REFC', 'ci': 'REFI',
'pl': 'REFR', 'sn': 'REFS',
'gr': 'REFG', 'ha': 'REFH'}
self.strat_re_fields = {'cl': 'REFC', 'ci': 'REFI',
'pl': 'REFR', 'sn': 'REFS',
'gr': 'REFG', 'ha': 'REFH'}
self.q_names_convective = {'cl': 'qclc', 'ci': 'qcic',
'pl': 'qplc', 'sn': 'qsnc',
'gr': 'qgrc', 'ha': 'qhac'}
self.q_names_stratiform = {'cl': 'qcls', 'ci': 'qcis',
'pl': 'qpls', 'sn': 'qsns',
'gr': 'qgrs', 'ha': 'qhas'}
self.ice_hyd_types = ["ci", "sn", "gr", "ha"]
ds = xr.open_dataset(file_path)
wrfin = Dataset(file_path)
self.ds = {}
self.ds["pressure"] = ds["P"] + ds["PB"]
self.ds["Z"] = getvar(wrfin, "z", units="m", timeidx=ALL_TIMES, squeeze=False)
self.ds["T"] = getvar(wrfin, "tk", timeidx=ALL_TIMES, squeeze=False)
self.ds["T"] = self.ds["T"]
self.ds["pressure"] = self.ds["pressure"] * 1e-2
self.ds["pressure"].attrs["units"] = "hPa"
self.ds["T"].attrs["units"] = "K"
self.ds["Z"].attrs["units"] = "m"
self._calc_air_density() # calculate air density with input T and p fields
# Qn in kg-1 --> cm-3 * rho to get m-3 * 1e-6 for cm-3
qn_conversion = self.ds["rho_a"].values * 1e-6
W = getvar(wrfin, "wa", units="m s-1", timeidx=ALL_TIMES, squeeze=False)
if NUWRF:
cldfrac = ds["CLDFRA"].values
for hyd_type in self.hyd_types:
self.ds[self.conv_frac_names[hyd_type]] = xr.DataArray(
np.zeros_like(ds["P"].values),
dims=('Time', 'bottom_top',
'south_north', 'west_east')).astype('float64')
self.ds["q%sc" % hyd_type] = xr.DataArray(
self.ds["conv_frac"].values,
dims=('Time', 'bottom_top',
'south_north', 'west_east')).astype('float64')
self.ds["q%ss" % hyd_type] = ds[self.q_names[hyd_type]]
# We can have out of cloud precip, so don't consider cloud fraction there
if hyd_type in ['ci', 'cl']:
if NUWRF is False:
cldfrac = np.where(ds[self.q_names[hyd_type]].values > q_hyd_truncation_cutoff, 1, 0)
hydfrac = cldfrac
else:
hydfrac = np.where(ds[self.q_names[hyd_type]].values > q_hyd_truncation_cutoff, 1, 0)
self.ds[self.strat_frac_names[hyd_type]] = xr.DataArray(
hydfrac,
dims=('Time', 'bottom_top',
'south_north', 'west_east')).astype('float64')
self.ds[self.N_field[hyd_type]] = ds[
self.N_field[hyd_type]].astype('float64') * qn_conversion
if mcphys_scheme.lower() == "nssl":
self.ds[self.conv_re_fields[hyd_type]] = ds[self.conv_re_fields[hyd_type]].astype('float64')
self.ds[self.conv_re_fields[hyd_type]].attrs["units"] = "micron"
self.ds[self.strat_re_fields[hyd_type]] = ds[self.strat_re_fields[hyd_type]].astype('float64')
self.ds[self.strat_re_fields[hyd_type]].attrs["units"] = "micron"
self.ds["QVAPOR"] = ds["QVAPOR"]
if mcphys_scheme == "nssl":
x = ds["QGRAUP"] / ds["QVGRAUPEL"]
x = np.where(np.isfinite(x), x, 900.)
self.ds["RHO_GRAUPEL"] = xr.DataArray(x, dims=ds["QGRAUP"].dims)
x = ds["QHAIL"] / ds["QVHAIL"]
x = np.where(np.isfinite(x), x, 900.)
self.ds["RHO_HAIL"] = xr.DataArray(x, dims=ds["QGRAUP"].dims)
self.ds["RHO_GRAUPEL"] = self.ds["RHO_GRAUPEL"] * (ureg.kg / (ureg.m**3))
self.ds["RHO_HAIL"] = self.ds["RHO_HAIL"] * (ureg.kg / (ureg.m**3))
self.variable_density = {'gr': "RHO_GRAUPEL",
'ha': "RHO_HAIL"}
self.time_dim = "Time"
self.height_dim = "bottom_top"
self.model_name = "WRF"
self.lat_dim = "south_north"
self.lon_dim = "west_east"
self.lat_name = "XLAT"
self.lon_name = "XLONG"
self.mcphys_scheme = mcphys_scheme
self.rad_scheme_family = "ModelE" # NOTE: adaptive implementation needed (as in mcphys_scheme) per WRF run
self.process_conv = False
wrfin.close()
for keys in self.ds.keys():
try:
self.ds[keys] = self.ds[keys].drop_vars("Time")
except (KeyError, ValueError):
continue
for keys in self.ds.keys():
try:
self.ds[keys] = self.ds[keys].drop_vars("XTIME")
except (KeyError, ValueError):
continue
self.ds = xr.Dataset(self.ds)
if bounding_box is not None:
super()._crop_bounding_box(bounding_box)
# crop specific model output time range (if requested)
if time_range is not None:
super()._crop_time_range(time_range)
# stack dimensions in the case of a regional output or squeeze lat/lon dims if exist and len==1
super().check_and_stack_time_lat_lon(file_path=file_path)
[docs]
class DHARMA(Model):
[docs]
def __init__(self, file_path, time_range=None, time_dim="dom_col", single_pi_class=True,
load_processed=False, bounding_box=None):
"""
This loads a DHARMA simulation with all of the necessary parameters
for EMC^2 to run.
Parameters
----------
file_path: str
Path to a ModelE simulation.
time_range: tuple or None
Start and end time to include. If this is None, the entire
simulation will be included.
time_dim: str
name of time dimension.
single_pi_class: bool
If True, using a single precipitating ice class (pi).
If False, using two precipitation ice classes (pi - snow and pir - rimed ice - both
using the same LUTs).
load_processed: bool
If True, treating the 'file_path' variable as an EMC2-processed dataset; thus skipping
appended string removal and dimension stacking, which are typically part of pre-processing.
bounding_box: None or 4-tuple
If not None, then a tuple representing the bounding box
(x_min, y_min, x_max, y_max).
"""
super().__init__()
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m**3), 'ci': 500. * ureg.kg / (ureg.m**3),
'pl': 1000. * ureg.kg / (ureg.m**3), 'pi': 100. * ureg.kg / (ureg.m**3)}
self.fluffy = {'ci': 0.5 * ureg.dimensionless, 'pi': 0.5 * ureg.dimensionless}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m**3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m**3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m**3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m**3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
if not single_pi_class:
self.Rho_hyd.update({'pir': 100. * ureg.kg / (ureg.m**3)})
self.fluffy.update({'pir': 0.5 * ureg.dimensionless})
self.lidar_ratio.update({'pir': 24.0 * ureg.dimensionless})
self.LDR_per_hyd.update({'pir': 0.40 * 1 / (ureg.kg / (ureg.m**3))})
self.vel_param_a.update({'pir': 11.72})
self.vel_param_b.update({'pir': 0.41 * ureg.dimensionless})
super()._add_vel_units()
self.q_field = "q"
self.N_field = {'cl': 'ncl', 'ci': 'nci', 'pl': 'npl', 'pi': 'npi'}
self.p_field = "p"
self.z_field = "z"
self.T_field = "t"
self.height_dim = "hgt"
self.lat_dim = "y"
self.lon_dim = "x"
self.time_dim = time_dim
self.conv_frac_names = {'cl': 'conv_dat', 'ci': 'conv_dat',
'pl': 'conv_dat', 'pi': 'conv_dat'}
self.strat_frac_names = {'cl': 'strat_cl_frac', 'ci': 'strat_ci_frac',
'pl': 'strat_pl_frac', 'pi': 'strat_pi_frac'}
self.conv_frac_names_for_rad = {'cl': 'conv_dat', 'ci': 'conv_dat',
'pl': 'conv_dat', 'pi': 'conv_dat'}
self.strat_frac_names_for_rad = {'cl': 'strat_cl_frac', 'ci': 'strat_ci_frac',
'pl': 'strat_pl_frac', 'pi': 'strat_pi_frac'}
self.conv_re_fields = {'cl': 'conv_dat', 'ci': 'conv_dat',
'pi': 'conv_dat', 'pl': 'conv_dat'}
self.strat_re_fields = {'cl': 're_strat_cl', 'ci': 're_strat_ci',
'pi': 're_strat_pi', 'pl': 're_strat_pl'}
self.q_names_convective = {'cl': 'conv_dat', 'ci': 'conv_dat', 'pl': 'conv_dat', 'pi': 'conv_dat'}
self.q_names_stratiform = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.Vd_rhoa_scaling_ref = {"cl": 1.0, # Ikawa and Saito (1990, p. 85)
"ci": 1.0, # Ikawa and Saito (1990, p. 85)
"pl": 1.1, # Liu and Orville (1969, p. 1269),
"pi": 1.15} # Localleti and Hobbs (1974) - est. from 750-1500 alt + T < 273.15
self.hyd_types = ["cl", "ci", "pl", "pi"]
if not single_pi_class:
self.N_field.update({'pir': 'npir'})
self.conv_frac_names.update({'pir': 'conv_dat'})
self.strat_frac_names.update({'pir': 'strat_pir_frac'})
self.conv_frac_names_for_rad.update({'pir': 'conv_dat'})
self.strat_frac_names_for_rad.update({'pir': 'strat_pir_frac'})
self.conv_re_fields.update({'pir': 'strat_pir_frac'})
self.strat_re_fields.update({'pir': 'strat_pir_frac'})
self.q_names_convective.update({'pir': 'conv_dat'})
self.q_names_stratiform.update({'pir': 'qpir'})
self.Vd_rhoa_scaling_ref['pir'] = 1.15 # Localleti and Hobbs (1974) - est. based on paper info
self.hyd_types.append("pir")
self.mcphys_scheme="MG" # MG2008
self.rad_scheme_family = "ModelE" # Similar implementation to ModelE3
if load_processed:
self.ds = xr.Dataset()
self.load_subcolumns_from_netcdf(file_path)
else:
self.ds = xr.open_dataset(file_path)
for variable in self.ds.variables.keys():
my_attrs = self.ds[variable].attrs
self.ds[variable] = self.ds[variable].astype('float64')
self.ds[variable].attrs = my_attrs
self._calc_air_density() # calculate air density with input T and p fields
if bounding_box is not None:
super()._crop_bounding_box_x_y(bounding_box)
# crop specific model output time range (if requested)
if time_range is not None:
if np.issubdtype(time_range.dtype, np.datetime64):
super()._crop_time_range(time_range)
else:
raise RuntimeError("input time range is not in the required datetime64 data type")
if not load_processed:
# stack dimensions in the case of a regional output or squeeze lat/lon dims if exist and len==1
super().check_and_stack_time_lat_lon(file_path=file_path)
self.model_name = "DHARMA"
[docs]
class TestModel(Model):
"""
This is a test Model structure used only for unit testing. It is not recommended for end users.
"""
[docs]
def __init__(self):
q = np.linspace(0, 1, 1000) * ureg.gram / ureg.kilogram
N = 100 * np.ones_like(q) / (ureg.centimeter ** 3)
heights = np.linspace(0, 11000., 1000) * ureg.meter
temp = 15.04 * ureg.kelvin - quantity(0.00649, 'kelvin/meter') * heights + 273.15 * ureg.kelvin
temp_c = temp.to('degC')
temp_k = temp
p = 1012.9 * ureg.hPa * (temp / (288.08 * ureg.kelvin)) ** 5.256
es = 0.6112 * ureg.hPa * np.exp(17.67 * temp_c.magnitude / (temp_c.magnitude + 243.5))
qv = 0.622 * es * 1e3 / (p * 1e2 - es * 1e3)
times = xr.DataArray(np.array([0]), dims=('time'))
times.attrs["units"] = "seconds"
heights = xr.DataArray(heights.magnitude[np.newaxis, :], dims=('time', 'height'))
heights.attrs['units'] = "meter"
heights.attrs["long_name"] = "Height above MSL"
p_units = p.units
p = xr.DataArray(p.magnitude[np.newaxis, :], dims=('time', 'height'))
p.attrs["long_name"] = "Air pressure"
p.attrs["units"] = '%s' % p_units
qv_units = qv.units
qv = xr.DataArray(qv.magnitude[np.newaxis, :], dims=('time', 'height'))
qv.attrs["long_name"] = "Water vapor mixing ratio"
qv.attrs["units"] = '%s' % qv_units
t_units = temp_k.units
temp = xr.DataArray(temp_k.magnitude[np.newaxis, :], dims=('time', 'height'))
temp.attrs["long_name"] = "Air temperature"
temp.attrs["units"] = '%s' % t_units
q = xr.DataArray(q.magnitude[np.newaxis, :], dims=('time', 'height'))
q.attrs["long_name"] = "Liquid cloud water mixing ratio"
q.attrs["units"] = '%s' % qv_units
N = xr.DataArray(N.magnitude[np.newaxis, :], dims=('time', 'height'))
N.attrs["long_name"] = "Cloud particle number concentration"
N.attrs["units"] = '%s' % qv_units
my_ds = xr.Dataset({'p_3d': p, 'q': qv, 't': temp, 'z': heights,
'qcl': q, 'ncl': N, 'qpl': q, 'qci': q, 'qpi': q,
'time': times})
super().__init__()
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m ** 3), 'ci': 500. * ureg.kg / (ureg.m ** 3),
'pl': 1000. * ureg.kg / (ureg.m ** 3), 'pi': 250. * ureg.kg / (ureg.m ** 3)}
self.fluffy = {'ci': 0.5 * ureg.dimensionless, 'pi': 0.5 * ureg.dimensionless}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m ** 3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m ** 3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m ** 3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m ** 3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
super()._add_vel_units()
self.q_names_convective = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.q_names_stratiform = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.q_field = "q"
self.N_field = {'cl': 'ncl', 'ci': 'nci', 'pl': 'npl', 'pi': 'npi'}
self.p_field = "p_3d"
self.z_field = "z"
self.T_field = "t"
self.conv_frac_names = {'cl': 'cldmccl', 'ci': 'cldmcci',
'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names = {'cl': 'cldsscl', 'ci': 'cldssci',
'pl': 'cldsspl', 'pi': 'cldsspi'}
self.conv_frac_names_for_rad = {'cl': 'cldmccl', 'ci': 'cldmcci',
'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names_for_rad = {'cl': 'cldsscl', 'ci': 'cldssci',
'pl': 'cldsspl', 'pi': 'cldsspi'}
self.mcphys_scheme = "MG2" # required to prevent errors from being raised.
self.rad_scheme_family = "E3SM" # required to prevent errors from being raised.
self.ds = my_ds
self.height_dim = "height"
self.time_dim = "time"
self._calc_air_density() # calculate air density with input T and p fields
self.hyd_types = ["cl", "ci", "pl", "pi"]
[docs]
class TestConvection(Model):
"""
This is a test Model structure used only for unit testing.
This model has a 100% convective column from 1 km to 11 km.
It is not recommended for end users.
"""
[docs]
def __init__(self):
q = np.linspace(0, 1, 1000) * ureg.gram / ureg.kilogram
N = 100 * np.ones_like(q) * (ureg.centimeter ** -3)
Npl = 0.001 * np.ones_like(1) * (ureg.centimeter ** -3)
heights = np.linspace(0, 11000., 1000) * ureg.meter
temp = 15.04 * ureg.kelvin - 0.00649 * (ureg.kelvin / ureg.meter) * heights + 273.15 * ureg.kelvin
temp_c = temp.to('degC')
temp_k = temp
p = 1012.9 * ureg.hPa * (temp / (288.08 * ureg.kelvin)) ** 5.256
re_cl = 10 * np.ones_like(q) * ureg.micrometer
re_pl = 100 * np.ones_like(q) * ureg.micrometer
es = 0.6112 * ureg.hPa * np.exp(17.67 * temp_c.magnitude / (temp_c.magnitude + 243.5))
qv = 0.622 * es * 1e3 / (p * 1e2 - es * 1e3) * q.units
convective_liquid = np.logical_and(heights > 1000. * ureg.meter,
temp >= 273.15 * ureg.kelvin)
convective_ice = np.logical_and(heights > 1000. * ureg.meter,
temp < 273.15 * ureg.kelvin)
Nci = np.where(convective_ice, Npl.magnitude, 0)
Npi = np.where(convective_ice, Npl.magnitude, 0)
Npl = np.where(convective_liquid, Npl.magnitude, 0)
cldmccl = np.where(convective_liquid, 1, 0.) * ureg.dimensionless
cldmcci = np.where(convective_ice, 1, 0.) * ureg.dimensionless
cldsscl = np.zeros_like(heights) * ureg.dimensionless
cldssci = np.zeros_like(heights) * ureg.dimensionless
times = xr.DataArray(np.array([0]), dims=('time'))
times.attrs["units"] = "seconds"
heights = xr.DataArray(heights.magnitude[np.newaxis, :], dims=('time', 'height'))
heights.attrs['units'] = "meter"
heights.attrs["long_name"] = "Height above MSL"
p_units = p.units
p = xr.DataArray(p.magnitude[np.newaxis, :], dims=('time', 'height'))
p.attrs["long_name"] = "Air pressure"
p.attrs["units"] = '%s' % p_units
qv_units = qv.units
qv = xr.DataArray(qv.magnitude[np.newaxis, :], dims=('time', 'height'))
qv.attrs["long_name"] = "Water vapor mixing ratio"
qv.attrs["units"] = '%s' % qv_units
t_units = temp_k.units
temp = xr.DataArray(temp_k.magnitude[np.newaxis, :], dims=('time', 'height'))
temp.attrs["long_name"] = "Air temperature"
temp.attrs["units"] = '%s' % t_units
q_units = q.units
q = xr.DataArray(q.magnitude[np.newaxis, :], dims=('time', 'height'))
q.attrs["long_name"] = "Liquid cloud water mixing ratio"
q.attrs["units"] = '%s' % q_units
N_units = N.units
N = xr.DataArray(N.magnitude[np.newaxis, :], dims=('time', 'height'))
N.attrs["long_name"] = "Cloud particle number concentration"
N.attrs["units"] = '%s' % N_units
re_cl = xr.DataArray(re_cl.magnitude[np.newaxis, :], dims=('time', 'height'))
re_cl.attrs["units"] = "micrometer"
re_cl.attrs["long_name"] = "Effective radius of cloud liquid particles"
re_pl = xr.DataArray(re_pl.magnitude[np.newaxis, :], dims=('time', 'height'))
re_pl.attrs["units"] = "micrometer"
re_pl.attrs["long_name"] = "Effective radius of cloud liquid particles"
cldmccl = xr.DataArray(cldmccl.magnitude[np.newaxis, :], dims=('time', 'height'))
cldmccl.attrs["units"] = 'g kg-1'
cldmccl.attrs["long_name"] = "Convective cloud liquid mixing ratio"
cldmcci = xr.DataArray(cldmcci.magnitude[np.newaxis, :], dims=('time', 'height'))
cldmcci.attrs["units"] = 'g kg-1'
cldmcci.attrs["long_name"] = "Convective cloud ice mixing ratio"
cldsscl = xr.DataArray(cldsscl.magnitude[np.newaxis, :], dims=('time', 'height'))
cldsscl.attrs["units"] = 'g kg-1'
cldsscl.attrs["long_name"] = "Stratiform cloud liquid mixing ratio"
cldssci = xr.DataArray(cldssci.magnitude[np.newaxis, :], dims=('time', 'height'))
cldssci.attrs["units"] = 'g kg-1'
cldssci.attrs["long_name"] = "Stratiform cloud ice mixing ratio"
Nci = xr.DataArray(Nci[np.newaxis, :], dims=('time', 'height'))
Nci.attrs["units"] = "cm-3"
Nci.attrs["long_name"] = "cloud ice particle number concentration"
Npl = xr.DataArray(Npl[np.newaxis, :], dims=('time', 'height'))
Npl.attrs["units"] = "cm-3"
Npl.attrs["long_name"] = "liquid precipitation particle number concentration"
Npi = xr.DataArray(Npi[np.newaxis, :], dims=('time', 'height'))
Npi.attrs["units"] = "cm-3"
Npi.attrs["long_name"] = "ice precipitation particle number concentration"
my_ds = xr.Dataset({'p_3d': p, 'q': qv, 't': temp, 'z': heights,
'qcl': q, 'ncl': N, 'nci': Nci, 'npl': Npl, 'npi': Npi,
'qpl': q, 'qci': q, 'qpi': q,
'cldmccl': cldmccl, 'cldmcci': cldmcci,
'cldsscl': cldsscl, 'cldssci': cldssci,
'cldmcpl': cldmccl, 'cldmcpi': cldmcci,
'cldsspl': cldsscl, 'cldsspi': cldssci,
'time': times, 're_cl': re_cl, 're_pl': re_pl})
super().__init__()
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.m ** 3), 'ci': 500. * ureg.kg / (ureg.m ** 3),
'pl': 1000. * ureg.kg / (ureg.m ** 3), 'pi': 250. * ureg.kg / (ureg.m ** 3)}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m ** 3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m ** 3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m ** 3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m ** 3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
super()._add_vel_units()
self.q_names_convective = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.q_names_stratiform = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.conv_re_fields = {'cl': 're_cl', 'ci': 're_cl', 'pl': 're_pl', 'pi': 're_pl'}
self.strat_re_fields = {'cl': 're_cl', 'ci': 're_cl', 'pl': 're_pl', 'pi': 're_pl'}
self.fluffy = {'ci': 0.5 * ureg.dimensionless, 'pi': 0.5 * ureg.dimensionless}
self.q_field = "q"
self.N_field = {'cl': 'ncl', 'ci': 'nci', 'pl': 'npl', 'pi': 'npi'}
self.p_field = "p_3d"
self.z_field = "z"
self.T_field = "t"
self.hyd_types = ["cl", "ci", "pl", "pi"]
self.conv_frac_names = {'cl': 'cldmccl', 'ci': 'cldmcci', 'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names = {'cl': 'cldsscl', 'ci': 'cldssci', 'pl': 'cldsspl', 'pi': 'cldsspi'}
self.conv_frac_names_for_rad = {'cl': 'cldmccl', 'ci': 'cldmcci', 'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names_for_rad = {'cl': 'cldsscl', 'ci': 'cldssci', 'pl': 'cldsspl', 'pi': 'cldsspi'}
self.mcphys_scheme = "MG2" # required to prevent errors from being raised.
self.rad_scheme_family = "ModelE" # required to prevent errors from being raised.
self.ds = my_ds
self._calc_air_density() # calculate air density with input T and p fields
self.height_dim = "height"
self.time_dim = "time"
[docs]
class TestHalfAndHalf(Model):
"""
This is a test Model structure used only for unit testing.
This model has a 50% stratiform, 50% convective column from 1 km to 11 km.
It is not recommended for end users.
"""
[docs]
def __init__(self):
q = np.linspace(0, 1, 1000) * ureg.gram / ureg.kilogram
N = 100 * np.ones_like(q) * (ureg.centimeter ** -3)
heights = np.linspace(0, 11000., 1000) * ureg.meter
temp = 15.04 * ureg.kelvin - 0.00649 * (ureg.kelvin / ureg.meter) * heights + 273.15 * ureg.kelvin
temp_c = temp.to('degC').magnitude
temp_k = temp.magnitude
p = 1012.9 * ureg.hPa * (temp / (288.08 * ureg.kelvin)) ** 5.256
es = 0.6112 * ureg.hPa * np.exp(17.67 * temp_c / (temp_c + 243.5))
qv = 0.622 * es * 1e3 / (p * 1e2 - es * 1e3) * q.units
stratiform_liquid = np.logical_and(heights > 1000. * ureg.meter,
temp >= 273.15 * ureg.kelvin)
stratiform_ice = np.logical_and(heights > 1000. * ureg.meter,
temp < 273.15 * ureg.kelvin)
cldsscl = 0.5 * np.where(stratiform_liquid, 1, 0.) * ureg.dimensionless
cldssci = 0.5 * np.where(stratiform_ice, 1, 0.) * ureg.dimensionless
cldmccl = 0.5 * np.where(stratiform_liquid, 1, 0.) * ureg.dimensionless
cldmcci = 0.5 * np.where(stratiform_ice, 1, 0.) * ureg.dimensionless
qcl = np.where(stratiform_liquid, q, 0)
qci = np.where(stratiform_ice, q, 0)
times = xr.DataArray(np.array([0]), dims=('time'))
times.attrs["units"] = "seconds"
heights = xr.DataArray(heights.magnitude[np.newaxis, :], dims=('time', 'height'))
heights.attrs['units'] = "meter"
heights.attrs["long_name"] = "Height above MSL"
p_units = p.units
p = xr.DataArray(p.magnitude[np.newaxis, :], dims=('time', 'height'))
p.attrs["long_name"] = "Air pressure"
p.attrs["units"] = '%s' % p_units
qv_units = qv.units
qv = xr.DataArray(qv.magnitude[np.newaxis, :], dims=('time', 'height'))
qv.attrs["long_name"] = "Water vapor mixing ratio"
qv.attrs["units"] = '%s' % qv_units
t_units = "degK"
temp = xr.DataArray(temp_k[np.newaxis, :], dims=('time', 'height'))
temp.attrs["long_name"] = "Air temperature"
temp.attrs["units"] = '%s' % t_units
q_units = q.units
q = xr.DataArray(q.magnitude[np.newaxis, :], dims=('time', 'height'))
q.attrs["long_name"] = "Liquid cloud water mixing ratio"
q.attrs["units"] = '%s' % q_units
N_units = N.units
N = xr.DataArray(N.magnitude[np.newaxis, :], dims=('time', 'height'))
N.attrs["long_name"] = "Cloud particle number concentration"
N.attrs["units"] = '%s' % N_units
qcl = xr.DataArray(qcl[np.newaxis, :], dims=('time', 'height'))
qcl.attrs["units"] = "g kg-1"
qcl.attrs["long_name"] = "Cloud liquid water mixing ratio"
qci = xr.DataArray(qci[np.newaxis, :], dims=('time', 'height'))
qci.attrs["units"] = "g kg-1"
qci.attrs["long_name"] = "Cloud ice water mixing ratio"
cldmccl = xr.DataArray(cldmccl.magnitude[np.newaxis, :], dims=('time', 'height'))
cldmccl.attrs["units"] = 'g kg-1'
cldmccl.attrs["long_name"] = "Convective cloud liquid mixing ratio"
cldmcci = xr.DataArray(cldmcci.magnitude[np.newaxis, :], dims=('time', 'height'))
cldmcci.attrs["units"] = 'g kg-1'
cldmcci.attrs["long_name"] = "Convective cloud ice mixing ratio"
cldsscl = xr.DataArray(cldsscl.magnitude[np.newaxis, :], dims=('time', 'height'))
cldsscl.attrs["units"] = 'g kg-1'
cldsscl.attrs["long_name"] = "Stratiform cloud liquid mixing ratio"
cldssci = xr.DataArray(cldssci.magnitude[np.newaxis, :], dims=('time', 'height'))
cldssci.attrs["units"] = 'g kg-1'
cldssci.attrs["long_name"] = "Stratiform cloud ice mixing ratio"
my_ds = xr.Dataset({'p_3d': p, 'q': qv, 't': temp, 'z': heights,
'qcl': q, 'ncl': N, 'qpl': q, 'qci': q, 'qpi': q,
'cldmccl': cldmccl, 'cldmcci': cldmcci,
'cldsscl': cldsscl, 'cldssci': cldssci,
'cldmcpl': cldmccl, 'cldmcpi': cldmcci,
'cldsspl': cldsscl, 'cldsspi': cldssci,
'time': times})
super().__init__()
self.Rho_hyd = {'cl': 1000. * ureg.kg / (ureg.meter ** 3), 'ci': 500. * ureg.kg / (ureg.meter ** 3),
'pl': 1000. * ureg.kg / (ureg.meter ** 3), 'pi': 250. * ureg.kg / (ureg.meter ** 3)}
self.fluffy = {'ci': 0.5 * ureg.dimensionless, 'pi': 0.5 * ureg.dimensionless}
self.lidar_ratio = {'cl': 18. * ureg.dimensionless,
'ci': 24. * ureg.dimensionless,
'pl': 5.5 * ureg.dimensionless,
'pi': 24.0 * ureg.dimensionless}
self.LDR_per_hyd = {'cl': 0.03 * 1 / (ureg.kg / (ureg.m ** 3)),
'ci': 0.35 * 1 / (ureg.kg / (ureg.m ** 3)),
'pl': 0.1 * 1 / (ureg.kg / (ureg.m ** 3)),
'pi': 0.40 * 1 / (ureg.kg / (ureg.m ** 3))}
self.vel_param_a = {'cl': 3e7, 'ci': 700., 'pl': 841.997, 'pi': 11.72}
self.vel_param_b = {'cl': 2. * ureg.dimensionless,
'ci': 1. * ureg.dimensionless,
'pl': 0.8 * ureg.dimensionless,
'pi': 0.41 * ureg.dimensionless}
super()._add_vel_units()
self.q_names_convective = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.q_names_stratiform = {'cl': 'qcl', 'ci': 'qci', 'pl': 'qpl', 'pi': 'qpi'}
self.conv_re_fields = {'cl': 're_cl', 'ci': 're_cl', 'pl': 're_pl', 'pi': 're_pl'}
self.strat_re_fields = {'cl': 're_cl', 'ci': 're_cl', 'pl': 're_pl', 'pi': 're_pl'}
self.q_field = "q"
self.N_field = {'cl': 'ncl', 'ci': 'nci', 'pl': 'npl', 'pi': 'npi'}
self.p_field = "p_3d"
self.z_field = "z"
self.T_field = "t"
self.conv_frac_names = {'cl': 'cldmccl', 'ci': 'cldmcci', 'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names = {'cl': 'cldsscl', 'ci': 'cldssci', 'pl': 'cldsspl', 'pi': 'cldsspi'}
self.conv_frac_names_for_rad = {'cl': 'cldmccl', 'ci': 'cldmcci', 'pl': 'cldmcpl', 'pi': 'cldmcpi'}
self.strat_frac_names_for_rad = {'cl': 'cldsscl', 'ci': 'cldssci', 'pl': 'cldsspl', 'pi': 'cldsspi'}
self.hyd_types = ["cl", "ci", "pl", "pi"]
self.mcphys_scheme = "MG2" # required to prevent errors from being raised.
self.rad_scheme_family = "ModelE" # required to prevent errors from being raised.
self.ds = my_ds
self._calc_air_density() # calculate air density with input T and p fields
self.height_dim = "height"
self.time_dim = "time"