"""
This module contains I/O operations for the U.S. Department of Energy
AmeriFlux program (https://ameriflux.lbl.gov/).
"""
import re
from collections import defaultdict
import warnings
import numpy as np
import pandas as pd
import xarray as xr
[docs]def read_ameriflux(
filename,
metadata_filename=None,
data_type=None,
timestep=None,
rename_vars_dict=None,
variable_units_dict=None,
):
"""
Returns `xarray.Dataset` with stored data from FLUXNET or BASE-BADM and
possible metadata from a user-defined metadata excel files for a single datastream.
Parameters
----------
filename : str
Filename of Ameriflux dataset
metadata_filename : str
Excel file usually provided with Ameriflux dataset that contains the data's metadata.
Default is None.
data_type : str
Type of data file to be read. Valid options are 'fluxnet' and 'base'.
Default is None and will try to determine data type from filename.
timestep : str
Timestep of data, this parameter is only used for 'fluxnet' data types and if
the time format can't be determined by the filename.
Default is None, options are 'year', 'month', 'week', 'day', and 'hour'.
rename_vars_dict : dict
A dictionary containing current variable names and new variable names to replace
the current variable names.
Default is None and variables are not renamed.
variable_units_dict : dict
A dictionary containing current variable names and units to be added to that variable.
Default is None and uses current units from Ameriflux's unit database.
Returns
-------
ds : xarray.Dataset
ACT Xarray dataset.
"""
default_units_dict = {
'COND_WATER': 'S cm-1',
'DO': 'mol L-1',
'PCH4': 'nmolCH4 mol-1',
'PCO2': 'molCO2 mol-1',
'PN2O': 'nmolN2O mol-1',
'PPFD_UW_IN': 'molPhotons m-2 s-1',
'TW': 'deg C',
'DBH': 'cm',
'LEAF_WET': '%',
'SAP_DT': 'deg C',
'SAP_FLOW': 'mmolH2O m-2 s-1',
'T_BOLE': 'deg C',
'T_CANOPY': 'deg C',
'FETCH_70': 'm',
'FETCH_80': 'm',
'FETCH_90': 'm',
'FETCH_FILTER': 'nondimensional',
'FETCH_MAX': 'm',
'CH4': 'nmolCH4 mol-1',
'CH4_MIXING_RATIO': 'nmolCH4 mol-1',
'CO': 'nmolCO mol-1',
'CO2': 'molCO2 mol-1',
'CO2_MIXING_RATIO': 'molCO2 mol-1',
'CO2_SIGMA': 'molCO2 mol-1',
'CO2C13': '(permil)',
'FC': 'molCO2 m-2 s-1',
'FCH4': 'nmolCH4 m-2 s-1',
'FN2O': 'nmolN2O m-2 s-1',
'FNO': 'nmolNO m-2 s-1',
'FNO2': 'nmolNO2 m-2 s-1',
'FO3': 'nmolO3 m-2 s-1',
'H2O': 'mmolH2O mol-1',
'H2O_MIXING_RATIO': 'mmolH2O mol-1',
'H2O_SIGMA': 'mmolH2O mol-1',
'N2O': 'nmolN2O mol-1',
'N2O_MIXING_RATIO': 'nmolN2O mol-1',
'NO': 'nmolNO mol-1',
'NO2': 'nmolNO2 mol-1',
'O3': 'nmolO3 mol-1',
'SC': 'molCO2 m-2 s-1',
'SCH4': 'nmolCH4 m-2 s-1',
'SN2O': 'nmolN2O m-2 s-1',
'SNO': 'nmolNO m-2 s-1',
'SNO2': 'nmolNO2 m-2 s-1',
'SO2': 'nmolSO2 mol-1',
'SO3': 'nmolO3 m-2 s-1',
'FH2O': 'mmolH2O m-2 s-1',
'G': 'W m-2',
'H': 'W m-2',
'LE': 'W m-2',
'SB': 'W m-2',
'SG': 'W m-2',
'SH': 'W m-2',
'SLE': 'W m-2',
'PA': 'kPa',
'PBLH': 'm',
'RH': '%',
'T_SONIC': 'deg C',
'T_SONIC_SIGMA': 'deg C',
'TA': 'deg C',
'VPD': 'hPa',
'D_SNOW': 'cm',
'P': 'mm',
'P_RAIN': 'mm',
'P_SNOW': 'mm',
'RUNOFF': 'mm',
'STEMFLOW': 'mm',
'THROUGHFALL': 'mm',
'ALB': '%',
'APAR': 'molPhoton m-2 s-1',
'EVI': 'nondimensional',
'FAPAR': '%',
'FIPAR': '%',
'LW_BC_IN': 'W m-2',
'LW_BC_OUT': 'W m-2',
'LW_IN': 'W m-2',
'LW_OUT': 'W m-2',
'MCRI': 'nondimensional',
'MTCI': 'nondimensional',
'NDVI': 'nondimensional',
'NETRAD': 'W m-2',
'NIRV': 'W m-2 sr-1 nm-1',
'PPFD_BC_IN': 'molPhoton m-2 s-1',
'PPFD_BC_OUT': 'molPhoton m-2 s-1',
'PPFD_DIF': 'molPhoton m-2 s-1',
'PPFD_DIR': 'molPhoton m-2 s-1',
'PPFD_IN': 'molPhoton m-2 s-1',
'PPFD_OUT': 'molPhoton m-2 s-1',
'PRI': 'nondimensional',
'R_UVA': 'W m-2',
'R_UVB': 'W m-2',
'REDCI': 'nondimensional',
'REP': 'nm',
'SPEC_NIR_IN': 'W m-2 nm-1',
'SPEC_NIR_OUT': 'W m-2 sr-1 nm-1',
'SPEC_NIR_REFL': 'nondimensional',
'SPEC_PRI_REF_IN': 'W m-2 nm-1',
'SPEC_PRI_REF_OUT': 'W m-2 sr-1 nm-1',
'SPEC_PRI_REF_REFL': 'nondimensional',
'SPEC_PRI_TGT_IN': 'W m-2 nm-1',
'SPEC_PRI_TGT_OUT': 'W m-2 sr-1 nm-1',
'SPEC_PRI_TGT_REFL': 'nondimensional',
'SPEC_RED_IN': 'W m-2 nm-1',
'SPEC_RED_OUT': 'W m-2 sr-1 nm-1',
'SPEC_RED_REFL': 'nondimensional',
'SR': 'nondimensional',
'SW_BC_IN': 'W m-2',
'SW_BC_OUT': 'W m-2',
'SW_DIF': 'W m-2',
'SW_DIR': 'W m-2',
'SW_IN': 'W m-2',
'SW_OUT': 'W m-2',
'TCARI': 'nondimensional',
'SWC': '%',
'SWP': 'kPa',
'TS': 'deg C',
'TSN': 'deg C',
'WTD': 'm',
'MO_LENGTH': 'm',
'TAU': 'kg m-1 s-2',
'U_SIGMA': 'm s-1',
'USTAR': 'm s-1',
'V_SIGMA': 'm s-1',
'W_SIGMA': 'm s-1',
'WD': 'Decimal degrees',
'WD_SIGMA': 'decimal degree',
'WS': 'm s-1',
'WS_MAX': 'm s-1',
'ZL': 'nondimensional',
'GPP': 'molCO2 m-2 s-1',
'NEE': 'molCO2 m-2 s-1',
'RECO': 'molCO2 m-2 s-1',
'FC_SSITC_TEST': 'nondimensional',
'FCH4_SSITC_TEST': 'nondimensional',
'FN2O_SSITC_TEST': 'nondimensional',
'FNO_SSITC_TEST': 'nondimensional',
'FNO2_SSITC_TEST': 'nondimensional',
'FO3_SSITC_TEST': 'nondimensional',
'H_SSITC_TEST': 'nondimensional',
'LE_SSITC_TEST': 'nondimensional',
'TAU_SSITC_TEST': 'nondimensional',
}
# Reader section for BASE BADM datasets
# Differs from fluxnet as there is metadata in the first few lines
# of the csv file.
if data_type is not None:
if data_type.lower() == 'base':
file_data_type = 'base'
elif data_type.lower() == 'fluxnet':
file_data_type = 'fluxnet'
else:
raise ValueError("Must choose a valid data_type, either 'base' or 'fluxnet'")
# Try to automatically get data type if data type isn't provided
elif data_type is None and 'FLUXNET' in filename:
file_data_type = 'fluxnet'
elif data_type is None and 'BASE-BADM' in filename:
file_data_type = 'base'
else:
raise ValueError(
'Could not determine the data type from the filename '
' Please provide a valid data type to the data_type parameter!'
)
if file_data_type == 'base':
# Grab site and version metadata
metadata = pd.read_csv(filename, header=None, nrows=2, sep=':', index_col=0)
site = metadata.loc['# Site']
version = metadata.loc['# Version']
# Files are hourly, set the time format
_format = "%Y%m%d%H%M"
# Read in actual data
df = pd.read_csv(filename, skiprows=2)
# Set time and format time to datetime64
key = 'TIMESTAMP_START'
df['time'] = pd.to_datetime(df[key], format=_format)
df = df.set_index('time')
df = df.drop(columns=['TIMESTAMP_START', 'TIMESTAMP_END'])
# Convert dataframe to xarray dataset
ds = xr.Dataset.from_dataframe(df)
# Add site and version to attributes
ds.attrs['site'] = site.values[0].strip()
ds.attrs['version'] = version.values[0].strip()
# Reader for fluxnet files
# Fluxnet files can be formatted in different time samplings
elif file_data_type == 'fluxnet':
# Checks timestep in filename, if not, will check timestep parameter
if 'YY' in filename or timestep == 'year':
_format = "%Y"
elif 'MM' in filename or timestep == 'month':
_format = "%Y%m"
elif 'DD' in filename or 'WW' in filename or timestep == 'week' or timestep == 'day':
_format = "%Y%m%d"
elif 'HH' in filename or timestep == 'day':
_format = "%Y%m%d%H%M"
else:
raise ValueError(
"Incorrect timestep provided or no timestep determined from filename, "
"please provide either year, month, week, day or hour for the timestep parameter."
)
# Read data into a pandas dataframe
df = pd.read_csv(filename)
# Set time and convert to datetime64
if 'TIMESTAMP_START' in df:
key = 'TIMESTAMP_START'
df['time'] = pd.to_datetime(df[key], format=_format)
df = df.set_index('time')
df = df.drop(columns=['TIMESTAMP_START', 'TIMESTAMP_END'])
else:
key = 'TIMESTAMP'
df['time'] = pd.to_datetime(df[key], format=_format)
df = df.set_index('time')
df = df.drop(columns=[key])
# Convert to xarray dataset
ds = xr.Dataset.from_dataframe(df)
# Renames variables if user provides new names
if rename_vars_dict is not None:
ds = ds.rename_vars(rename_vars_dict)
# Add _FILL_VALUE attribute
for key in ds.variables.keys():
if -9999.0 in ds.variables[key].values or -9999 in ds.variables[key].values:
ds.variables[key].attrs['_FILL_VALUE'] = -9999
# Add nans where fill value is present
ds = ds.where(ds != -9999.0)
# Add units from the unit dictionary
# Matches keys that have different levels as well such as SWC_1_1_1
# Only works for base dataset
if variable_units_dict is None and file_data_type == 'base':
for key in ds.variables.keys():
try:
if re.match(r'TS_[\d]', key):
unit_key = 'TS'
ds.variables[key].attrs['units'] = default_units_dict[unit_key]
elif re.match(r'SWC_[\d]', key):
unit_key = 'SWC'
ds.variables[key].attrs['units'] = default_units_dict[unit_key]
elif re.match(r'G_[\d]', key):
unit_key = 'G'
ds.variables[key].attrs['units'] = default_units_dict[unit_key]
elif re.match(r'VPD_', key):
unit_key = 'VPD'
ds.variables[key].attrs['units'] = default_units_dict[unit_key]
else:
ds.variables[key].attrs['units'] = default_units_dict[key]
except KeyError:
continue
if variable_units_dict is not None:
for key in ds.variables.keys():
ds.variables[key].attrs['units'] = variable_units_dict[key]
# Adds metadata to the dataset if a metadata file is provided
if metadata_filename is not None:
ds = _ameriflux_metadata_processing(ds, metadata_filename)
return ds
def _ameriflux_metadata_processing(ds, metadata_filename):
"""Adds metadata to an ameriflux dataset if a metadata file is provided."""
# Read in metadata excel file if provided.
meta_df = pd.read_excel(metadata_filename)
# Create a list for attributes and their respective values
meta_key_list = meta_df['VARIABLE'].to_list()
meta_value_list = meta_df['DATAVALUE'].to_list()
# Add attrs that are non duplicates as duplicates require more processing below
non_duplicates = [item for item in meta_key_list if meta_key_list.count(item) == 1]
data_dict = defaultdict(list)
for i, j in zip(meta_key_list, meta_value_list):
data_dict[i].append(j)
for key in non_duplicates:
ds.attrs[key] = data_dict[key][0]
# The code below to retrieve group metadata was done so that
# If the order changes of these attributes, which it can, it will find them regardless
# Also checks if a specific attribute is missing for a team member but not others
meta_arr = np.array(meta_key_list)
# Retrieve team_member metadata
team_indices = np.where(meta_arr == 'TEAM_MEMBER_NAME')[0]
team_members = []
valid_names = [
'TEAM_MEMBER_NAME',
'TEAM_MEMBER_EMAIL',
'TEAM_MEMBER_INSTITUTION',
'TEAM_MEMBER_ROLE',
'TEAM_MEMBER_ORCID',
'TEAM_MEMBER_ADDRESS',
]
for i, j in enumerate(team_indices):
if j == team_indices[-1]:
team_info_range = np.arange(team_indices[i], team_indices[i] + 5)
team_members_str = []
[
team_members_str.append(str(meta_key_list[k]) + ':' + str(meta_value_list[k]))
for k in team_info_range
if meta_key_list[k] in valid_names
]
result_str = ", ".join(team_members_str)
team_members.append(result_str)
break
else:
team_info_range = np.arange(team_indices[i], team_indices[i + 1])
team_members_str = []
[
team_members_str.append(str(meta_key_list[k]) + ':' + str(meta_value_list[k]))
for k in team_info_range
if meta_key_list[k] in valid_names
]
result_str = ", ".join(team_members_str)
team_members.append(result_str)
ds.attrs['TEAM_MEMBERS'] = team_members
# Retrieve doi contributor metadata
doi_indices = np.where(meta_arr == 'DOI_CONTRIBUTOR_DATAPRODUCT')[0]
doi_members = []
valid_doi_names = [
'DOI_CONTRIBUTOR_DATAPRODUCT',
'DOI_CONTRIBUTOR_NAME',
'DOI_CONTRIBUTOR_ROLE',
'DOI_CONTRIBUTOR_INSTITUTION',
'DOI_CONTRIBUTOR_EMAIL',
'DOI_CONTRIBUTOR_ORCID',
'DOI_CONTRIBUTOR_DATE_START',
'DOI_CONTRIBUTOR_DATE_END',
'DOI CONTRIBUTOR_ADDRESS',
]
for i, j in enumerate(doi_indices):
if j == doi_indices[-1]:
doi_info_range = np.arange(doi_indices[i], doi_indices[i] + 5)
doi_contrib_str = []
[
doi_contrib_str.append(str(meta_key_list[k]) + ':' + str(meta_value_list[k]))
for k in doi_info_range
if meta_key_list[k] in valid_doi_names
]
result_doi_str = ", ".join(doi_contrib_str)
doi_members.append(result_doi_str)
break
else:
doi_info_range = np.arange(doi_indices[i], doi_indices[i + 1])
doi_contrib_str = []
[
doi_contrib_str.append(str(meta_key_list[k]) + ':' + str(meta_value_list[k]))
for k in doi_info_range
if meta_key_list[k] in valid_doi_names
]
result_doi_str = ", ".join(doi_contrib_str)
doi_members.append(result_doi_str)
ds.attrs['DOI_CONTRIBUTORS'] = doi_members
# Retrieve flux method metadata
flux_indices = np.where(meta_arr == 'FLUX_MEASUREMENTS_METHOD')[0]
flux_members = []
valid_flux_names = [
'FLUX_MEASUREMENTS_METHOD',
'FLUX_MEASUREMENTS_VARIABLE',
'FLUX_MEASUREMENTS_DATE_START',
'FLUX_MEASUREMENTS_DATE_END',
'FLUX_MEASUREMENTS_OPERATIONS',
]
for i, j in enumerate(flux_indices):
if j == flux_indices[-1]:
flux_info_range = np.arange(flux_indices[i], flux_indices[i] + 5)
flux_str = []
[
flux_str.append(str(meta_key_list[k]) + ':' + str(meta_value_list[k]))
for k in flux_info_range
if meta_key_list[k] in valid_flux_names
]
result_flux_str = ", ".join(flux_str)
flux_members.append(result_flux_str)
break
else:
flux_info_range = np.arange(flux_indices[i], flux_indices[i + 1])
flux_str = []
[
flux_str.append(str(meta_key_list[k]) + ':' + str(meta_value_list[k]))
for k in flux_info_range
if meta_key_list[k] in valid_flux_names
]
result_flux_str = ", ".join(flux_str)
flux_members.append(result_flux_str)
ds.attrs['FLUX_MEASUREMENTS_METHODS'] = flux_members
return ds
[docs]def convert_to_ameriflux(
ds,
variable_mapping=None,
soil_mapping=None,
depth_profile=[2.5, 5, 10, 15, 20, 30, 35, 50, 75, 100],
include_missing_variables=False,
**kwargs,
):
"""
Returns `xarray.Dataset` with stored data and metadata from a user-defined
query of ARM-standard netCDF files from a single datastream. Has some procedures
to ensure time is correctly fomatted in returned Dataset.
Parameters
----------
ds : xarray.Dataset
Dataset of data to convert to AmeriFlux format
variable_mapping : dict
Dictionary of variables mappings. The key should be the name of the variable
in the Dataset with the values being dictionaries of the AmeriFlux name and units.
For example:
var_mapping = {
'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'},
}
soil_mapping : dict
Dictionary of soil variables mappings following the same formatting as variable_mapping.
It is understood that the AmeriFlux name may be the same for some variables. This
script attempts to automatically name these measurements. If a variable is not dimensioned
by a depth nor has a sensor_height attribute, it will automatically assume that it's
at the first depth in the depth_profile variable.
depth_profile : list
List of depths that the variables will be mapped to. If a depth is not in this list,
the index chosen will be the one closest to the depth value.
include_missing_variables : boolean
If there variables that are completely missing (-9999) chose whether or not to include
them in the DataFrame.
Returns
-------
df : pandas.DataFrame (or None)
Returns a pandas dataframe for easy writing to csv
"""
# Use ARM variable mappings if none provided
if variable_mapping is None:
warnings.warn('Variable mapping was not provided, using default ARM mapping')
# Define variable mapping and units
# The key is the variable name in the data and the name in the dictionary
# is the AmeriFlux Name
var_mapping = {
'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'},
'co2_molar_fraction': {'name': 'CO2', 'units': 'nmol/mol'},
'co2_mixing_ratio': {'name': 'CO2_MIXING_RATIO', 'units': 'umol/mol'},
'h2o_mole_fraction': {'name': 'H2O', 'units': 'mmol/mol'},
'h2o_mixing_ratio': {'name': 'H2O_MIXING_RATIO', 'units': 'mmol/mol'},
'ch4_mole_fraction': {'name': 'CH4', 'units': 'nmol/mol'},
'ch4_mixing_ratio': {'name': 'CH4_MIXING_RATIO', 'units': 'nmol/mol'},
'momentum_flux': {'name': 'TAU', 'units': 'kg/(m s^2)'},
'sensible_heat_flux': {'name': 'H', 'units': 'W/m^2'},
'latent_flux': {'name': 'LE', 'units': 'W/m^2'},
'air_temperature': {'name': 'TA', 'units': 'deg C'},
'air_pressure': {'name': 'PA', 'units': 'kPa'},
'relative_humidity': {'name': 'RH', 'units': '%'},
'sonic_temperature': {'name': 'T_SONIC', 'units': 'deg C'},
'water_vapor_pressure_defecit': {'name': 'VPD', 'units': 'hPa'},
'Monin_Obukhov_length': {'name': 'MO_LENGTH', 'units': 'm'},
'Monin_Obukhov_stability_parameter': {'name': 'ZL', 'units': ''},
'mean_wind': {'name': 'WS', 'units': 'm/s'},
'wind_direction_from_north': {'name': 'WD', 'units': 'deg'},
'friction_velocity': {'name': 'USTAR', 'units': 'm/s'},
'maximum_instantaneous_wind_speed': {'name': 'WS_MAX', 'units': 'm/s'},
'down_short_hemisp': {'name': 'SW_IN', 'units': 'W/m^2'},
'up_short_hemisp': {'name': 'SW_OUT', 'units': 'W/m^2'},
'down_long': {'name': 'LW_IN', 'units': 'W/m^2'},
'up_long': {'name': 'LW_OUT', 'units': 'W/m^2'},
'albedo': {'name': 'ALB', 'units': '%'},
'net_radiation': {'name': 'NETRAD', 'units': 'W/m^2'},
'par_inc': {'name': 'PPFD_IN', 'units': 'umol/(m^2 s)'},
'par_ref': {'name': 'PPFD_OUT', 'units': 'umol/(m^2 s)'},
'precip': {'name': 'P', 'units': 'mm'},
}
# Use ARM variable mappings if none provided
# Similar to the above. This has only been tested on the ARM
# ECOR, SEBS, STAMP, and AMC combined. The automated naming may
# not work for all cases
if soil_mapping is None:
warnings.warn('Soil variable mapping was not provided, using default ARM mapping')
soil_mapping = {
'surface_soil_heat_flux': {'name': 'G', 'units': 'W/m^2'},
'soil_temp': {'name': 'TS', 'units': 'deg C'},
'temp': {'name': 'TS', 'units': 'deg C'},
'soil_moisture': {'name': 'SWC', 'units': '%'},
'soil_specific_water_content': {'name': 'SWC', 'units': '%'},
'vwc': {'name': 'SWC', 'units': '%'},
}
# Loop through variables and update units to the AmeriFlux standard
for v in ds:
if v in var_mapping:
ds = ds.utils.change_units(variables=v, desired_unit=var_mapping[v]['units'])
# Get start/end time stamps
ts_start = ds['time'].dt.strftime('%Y%m%d%H%M').values
ts_end = [
pd.to_datetime(t + np.timedelta64(30, 'm')).strftime('%Y%m%d%H%M')
for t in ds['time'].values
]
data = {}
data['TIMESTAMP_START'] = ts_start
data['TIMESTAMP_END'] = ts_end
# Loop through the variables in the var mapping dictionary and add data to dictionary
for v in var_mapping:
if v in ds:
if 'missing_value' not in ds[v].attrs:
ds[v].attrs['missing_value'] = -9999
if np.all(ds[v].isnull()):
if include_missing_variables:
data[var_mapping[v]['name']] = ds[v].values
else:
data[var_mapping[v]['name']] = ds[v].values
else:
if include_missing_variables:
data[var_mapping[v]['name']] = np.full(ds['time'].shape, -9999)
# Automated naming for the soil variables
# Again, this may not work for other cases. Careful review is needed.
prev_var = ''
for var in soil_mapping:
if soil_mapping[var]['name'] != prev_var:
h = 1
r = 1
prev_var = soil_mapping[var]['name']
soil_vars = [
v2
for v2 in list(ds)
if (v2.startswith(var)) & ('std' not in v2) & ('qc' not in v2) & ('net' not in v2)
]
for i, svar in enumerate(soil_vars):
vert = 1
if ('avg' in svar) | ('average' in svar):
continue
soil_data = ds[svar].values
data_shape = soil_data.shape
if len(data_shape) > 1:
coords = ds[svar].coords
depth_name = list(coords)[-1]
depth_values = ds[depth_name].values
for depth_ind in range(len(depth_values)):
soil_data_depth = soil_data[:, depth_ind]
vert = np.where(depth_profile == depth_values[depth_ind])[0][0] + 1
new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)])
data[new_name] = soil_data_depth
else:
if 'sensor_height' in ds[svar].attrs:
sensor_ht = ds[svar].attrs['sensor_height'].split(' ')
depth = abs(float(sensor_ht[0]))
units = sensor_ht[1]
if units == 'cm':
vert = np.argmin(np.abs(np.array(depth_profile) - depth)) + 1
new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)])
data[new_name] = soil_data
h += 1
# Convert dictionary to dataframe and return
df = pd.DataFrame(data)
df = df.fillna(-9999.0)
return df