Source code for act.utils.qc_utils

"""
Functions containing utilities for quality control which
may or may not be program dependent

"""

import os

import numpy as np
import pandas as pd


[docs]def calculate_dqr_times( ds, variable=None, txt_path=None, threshold=None, qc_bit=None, return_missing=True ): """ Function to retrieve start and end times of missing or bad data. Function will retrieve start and end time strings in a format that the DQR submission tool can read, print them to the console, and save to a .txt file if desired. Parameters ---------- ds : xarray.Dataset Xarray dataset as read by ACT where data are stored. variable : str or list of str Data variable(s) in the dataset to check. Can check multiple variables. txt_path : str Full path to directory in which to save .txt files with start and end times. If directory doesn't exist the program will create it. If set to None then no .txt files will be created. Default is None. threshold : int Threshold of number of data points to trigger start and end time calculations. Value is interpreted differently depending on the resolution of the data in the specified files. For example, if data is 1-minute data, a threshold of 30 would mean flagged data more than 30 minutes apart would show up as two separate time ranges; if data is 30- minute data, a threshold of 2 would mean flagged data more than 60 mins apart would show up as two separate time ranges. qc_bit : int Bit number to choose if finding times for bad data. If set then will override searching for missing data. Default is None. return_missing : bool Specifies whether or not to return times of data flagged as missing. If set to False, will return times of bad data. Default is True. Returns ------- time_strings : list List of tuples with the first element as the start time and the second element as the end time. """ # Determine if searching for either bad or missing data if not return_missing: return_bad = True return_missing = False else: return_bad = False # If qc bit set then make sure searching for bad data if qc_bit: return_bad = True return_missing = False # Clean files. Converts from ARM to CF standards ds.clean.cleanup(cleanup_arm_qc=True, handle_missing_value=True, link_qc_variables=True) date = ds.attrs['_file_dates'][0] datastream = ds.attrs['_datastream'] # Make variable instance a list always if variable is not None: if not isinstance(variable, list): variable = [variable] else: variable = [] # Make sure that threshold number is an integer. If not convert to # closest integer if threshold is not None: if not isinstance(threshold, int): int(round(threshold)) else: print('You must specify a threshold for separating ranges of' + ' flagged data') return # If return_missing then search for indices where data is equal to # missing value. if return_missing: for var in variable: # Get indices where data is being flagged as missing idx = np.where(np.isnan(ds[var].values))[0] # Find where there are gaps in flagged data time_diff = np.diff(idx) # Find indices in flagged data where gaps occur splits = np.where(time_diff > threshold) splits = splits[0] + 1 # If no bad indices then exit if len(idx) == 0: print(f'No missing data for {var} on ' + date) continue else: idx = np.split(idx, splits) # Now that we have all of the stretches of bad flags, get # corresponding datetimes from time data dt_times = [] for time in idx: # If there is only one flagged data point skip if len(time) < 2: pass else: dt_times.append((ds['time'].values[time[0]], ds['time'].values[time[-1]])) # Convert the datetimes to strings time_strings = [] for st, et in dt_times: start_time = pd.to_datetime(str(st)).strftime('%Y-%m-%d %H:%M:%S') end_time = pd.to_datetime(str(et)).strftime('%Y-%m-%d %H:%M:%S') time_strings.append((start_time, end_time)) # Print times to screen print(f'Missing Data for {var} begins at: ' + start_time) print(f'Missing Data for {var} ends at: ' + end_time) if txt_path: _write_dqr_times_to_txt(datastream, date, txt_path, var, time_strings) return time_strings # If return_bad then search for times in the corresponding qc variable # where the flags are tripped if return_bad: for var in variable: # If a1 level data, return. if 'a1' in ds.attrs['data_level']: print('No QC is present in a1 level.') return # Get QC data from corresponding QC variable qc_var = 'qc_' + var try: qc_data = ds[qc_var].values except KeyError: print('Unable to calculate start and end times for bad data') continue # Make sure qc bit is an integer if not isinstance(qc_bit, int): raise TypeError('QC bit must be an integer') # Get indices where data is being flagged for given qc bit idx = np.where(qc_data == 2 ** (qc_bit - 1))[0] # Find where there are gaps in flagged data time_diff = np.diff(idx) # Find indices in flagged data where gaps occur splits = np.where(time_diff > threshold) splits = splits[0] + 1 # If no bad indices then exit if len(idx) == 0: print('No bad data on ' + date + ' for selected QC bit for' + ' variable ' + var) continue else: idx = np.split(idx, splits) # Now that we have all of the stretches of bad flags, get # corresponding datetimes from time data dt_times = [] for time in idx: # If there is only one flagged data point skip if len(time) < 2: pass else: dt_times.append((ds['time'].values[time[0]], ds['time'].values[time[-1]])) # Convert the datetimes to strings time_strings = [] for st, et in dt_times: start_time = pd.to_datetime(str(st)).strftime('%Y-%m-%d %H:%M:%S') end_time = pd.to_datetime(str(et)).strftime('%Y-%m-%d %H:%M:%S') time_strings.append((start_time, end_time)) # Print times to screen print(f'Bad Data for {var} Begins at: ' + start_time) print(f'Bad Data for {var} Ends at: ' + end_time) if txt_path: _write_dqr_times_to_txt(datastream, date, txt_path, var, time_strings) return time_strings
def _write_dqr_times_to_txt(datastream, date, txt_path, variable, time_strings): """ Writes flagged data time range(s) to a .txt file. The naming convention is dqrtimes_datastream.date.txt. Parameters ---------- datastream : str ARM datastream name (ie, sgpmetE13.b1). date : str Date of time range(s) being written. txt_path : str Base path of where the .txt files are being written. variable : str Name of variable being flagged. time_strings : list of tuples List of every start and end time to be written. """ print( 'Writing data to text file for ' + datastream + ' ' + variable + ' on ' + date + ' at ' + txt_path + '...', flush=True, ) full_path = txt_path + '/' + datastream if os.path.exists(full_path) is False: os.mkdir(full_path) with open( full_path + '/dqrtimes_' + datastream + '.' + date + '.' + variable + '.txt', 'w', ) as text_file: for st, et in time_strings: text_file.write('%s, ' % st) text_file.write('%s \n' % et)