Source code for act.io.ameriflux

"""
This module contains I/O operations for the U.S. Department of Energy
AmeriFlux program (https://ameriflux.lbl.gov/).
"""

import numpy as np
import pandas as pd
import warnings


[docs]def convert_to_ameriflux( ds, variable_mapping=None, soil_mapping=None, depth_profile=[2.5, 5, 10, 15, 20, 30, 35, 50, 75, 100], include_missing_variables=False, **kwargs, ): """ Returns `xarray.Dataset` with stored data and metadata from a user-defined query of ARM-standard netCDF files from a single datastream. Has some procedures to ensure time is correctly fomatted in returned Dataset. Parameters ---------- ds : xarray.Dataset Dataset of data to convert to AmeriFlux format variable_mapping : dict Dictionary of variables mappings. The key should be the name of the variable in the Dataset with the values being dictionaries of the AmeriFlux name and units. For example: var_mapping = { 'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'}, } soil_mapping : dict Dictionary of soil variables mappings following the same formatting as variable_mapping. It is understood that the AmeriFlux name may be the same for some variables. This script attempts to automatically name these measurements. If a variable is not dimensioned by a depth nor has a sensor_height attribute, it will automatically assume that it's at the first depth in the depth_profile variable. depth_profile : list List of depths that the variables will be mapped to. If a depth is not in this list, the index chosen will be the one closest to the depth value. include_missing_variables : boolean If there variables that are completely missing (-9999) chose whether or not to include them in the DataFrame. Returns ------- df : pandas.DataFrame (or None) Returns a pandas dataframe for easy writing to csv """ # Use ARM variable mappings if none provided if variable_mapping is None: warnings.warn('Variable mapping was not provided, using default ARM mapping') # Define variable mapping and units # The key is the variable name in the data and the name in the dictionary # is the AmeriFlux Name var_mapping = { 'co2_flux': {'name': 'FC', 'units': 'umol/(m^2 s)'}, 'co2_molar_fraction': {'name': 'CO2', 'units': 'nmol/mol'}, 'co2_mixing_ratio': {'name': 'CO2_MIXING_RATIO', 'units': 'umol/mol'}, 'h2o_mole_fraction': {'name': 'H2O', 'units': 'mmol/mol'}, 'h2o_mixing_ratio': {'name': 'H2O_MIXING_RATIO', 'units': 'mmol/mol'}, 'ch4_mole_fraction': {'name': 'CH4', 'units': 'nmol/mol'}, 'ch4_mixing_ratio': {'name': 'CH4_MIXING_RATIO', 'units': 'nmol/mol'}, 'momentum_flux': {'name': 'TAU', 'units': 'kg/(m s^2)'}, 'sensible_heat_flux': {'name': 'H', 'units': 'W/m^2'}, 'latent_flux': {'name': 'LE', 'units': 'W/m^2'}, 'air_temperature': {'name': 'TA', 'units': 'deg C'}, 'air_pressure': {'name': 'PA', 'units': 'kPa'}, 'relative_humidity': {'name': 'RH', 'units': '%'}, 'sonic_temperature': {'name': 'T_SONIC', 'units': 'deg C'}, 'water_vapor_pressure_defecit': {'name': 'VPD', 'units': 'hPa'}, 'Monin_Obukhov_length': {'name': 'MO_LENGTH', 'units': 'm'}, 'Monin_Obukhov_stability_parameter': {'name': 'ZL', 'units': ''}, 'mean_wind': {'name': 'WS', 'units': 'm/s'}, 'wind_direction_from_north': {'name': 'WD', 'units': 'deg'}, 'friction_velocity': {'name': 'USTAR', 'units': 'm/s'}, 'maximum_instantaneous_wind_speed': {'name': 'WS_MAX', 'units': 'm/s'}, 'down_short_hemisp': {'name': 'SW_IN', 'units': 'W/m^2'}, 'up_short_hemisp': {'name': 'SW_OUT', 'units': 'W/m^2'}, 'down_long': {'name': 'LW_IN', 'units': 'W/m^2'}, 'up_long': {'name': 'LW_OUT', 'units': 'W/m^2'}, 'albedo': {'name': 'ALB', 'units': '%'}, 'net_radiation': {'name': 'NETRAD', 'units': 'W/m^2'}, 'par_inc': {'name': 'PPFD_IN', 'units': 'umol/(m^2 s)'}, 'par_ref': {'name': 'PPFD_OUT', 'units': 'umol/(m^2 s)'}, 'precip': {'name': 'P', 'units': 'mm'}, } # Use ARM variable mappings if none provided # Similar to the above. This has only been tested on the ARM # ECOR, SEBS, STAMP, and AMC combined. The automated naming may # not work for all cases if soil_mapping is None: warnings.warn('Soil variable mapping was not provided, using default ARM mapping') soil_mapping = { 'surface_soil_heat_flux': {'name': 'G', 'units': 'W/m^2'}, 'soil_temp': {'name': 'TS', 'units': 'deg C'}, 'temp': {'name': 'TS', 'units': 'deg C'}, 'soil_moisture': {'name': 'SWC', 'units': '%'}, 'soil_specific_water_content': {'name': 'SWC', 'units': '%'}, 'vwc': {'name': 'SWC', 'units': '%'}, } # Loop through variables and update units to the AmeriFlux standard for v in ds: if v in var_mapping: ds = ds.utils.change_units(variables=v, desired_unit=var_mapping[v]['units']) # Get start/end time stamps ts_start = ds['time'].dt.strftime('%Y%m%d%H%M').values ts_end = [ pd.to_datetime(t + np.timedelta64(30, 'm')).strftime('%Y%m%d%H%M') for t in ds['time'].values ] data = {} data['TIMESTAMP_START'] = ts_start data['TIMESTAMP_END'] = ts_end # Loop through the variables in the var mapping dictionary and add data to dictionary for v in var_mapping: if v in ds: if 'missing_value' not in ds[v].attrs: ds[v].attrs['missing_value'] = -9999 if np.all(ds[v].isnull()): if include_missing_variables: data[var_mapping[v]['name']] = ds[v].values else: data[var_mapping[v]['name']] = ds[v].values else: if include_missing_variables: data[var_mapping[v]['name']] = np.full(ds['time'].shape, -9999) # Automated naming for the soil variables # Again, this may not work for other cases. Careful review is needed. prev_var = '' for var in soil_mapping: if soil_mapping[var]['name'] != prev_var: h = 1 r = 1 prev_var = soil_mapping[var]['name'] soil_vars = [ v2 for v2 in list(ds) if (v2.startswith(var)) & ('std' not in v2) & ('qc' not in v2) & ('net' not in v2) ] for i, svar in enumerate(soil_vars): vert = 1 if ('avg' in svar) | ('average' in svar): continue soil_data = ds[svar].values data_shape = soil_data.shape if len(data_shape) > 1: coords = ds[svar].coords depth_name = list(coords)[-1] depth_values = ds[depth_name].values for depth_ind in range(len(depth_values)): soil_data_depth = soil_data[:, depth_ind] vert = np.where(depth_profile == depth_values[depth_ind])[0][0] + 1 new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)]) data[new_name] = soil_data_depth else: if 'sensor_height' in ds[svar].attrs: sensor_ht = ds[svar].attrs['sensor_height'].split(' ') depth = abs(float(sensor_ht[0])) units = sensor_ht[1] if units == 'cm': vert = np.argmin(np.abs(np.array(depth_profile) - depth)) + 1 new_name = '_'.join([soil_mapping[var]['name'], str(h), str(vert), str(r)]) data[new_name] = soil_data h += 1 # Convert dictionary to dataframe and return df = pd.DataFrame(data) df = df.fillna(-9999.0) return df