Source code for pysp2.io.read_dat

""" I/O function for reading .dat files from original IGOR processing."""

import xarray as xr
import pandas as pd
import numpy as np
import act
import platform
import os

from glob import glob
from datetime import datetime


[docs] def read_dat(file_name, type): """ This reads the .dat files that generate the intermediate parameters used by the Igor processing. Wildcards are supported. Parameters ---------- file_name: str The name of the file to save to. Use a wildcard to open multiple files at once. type: str This parameter must be one of: 'particle': Load individual particle timeseries from .dat file 'conc': Load timeseries of concentrations. Returns ------- ds: xarray.Dataset The xarray dataset to store the parameters in. """ if type.lower() not in ['particle', 'conc']: raise ValueError("Invalid input for type, must be either 'particle' or 'conc'!") fname = glob(file_name, recursive=True) ds_list = [] for f in fname: try: if type.lower() == 'particle': ds = act.io.text.read_csv(f, sep="\t", skiprows=2) else: ds = act.io.text.read_csv(f, sep="\t") ds_list.append(ds) except (pd.errors.EmptyDataError, IndexError): continue if type.lower() == 'particle': return xr.concat(ds_list, dim='index').sortby('DateTimeWave') elif type.lower() == 'conc': return xr.concat(ds_list, dim='index').sortby('Start DateTime')
[docs] def read_arm_dat(file_name, num_bins=199): """ This reads mass and number distribution data that has been stored in the format used in the ARM Archive. Parameters ---------- file_name: str File name or directory with .dat files. All .dat files must have same time indices. num_bins: int or None Number of size distribution bins in the file. Set to None to have PySP2 attempt to automatically detect this. Returns ------- ds: pandas.DataFrame The pandas dataframe containing the data. """ fname = sorted(glob(file_name, recursive=True)) ds_list = [] i = 0 for f in fname: try: ds = pd.read_csv(f, sep="\t", skiprows=32, index_col="SP2_datetime_in_sec") ds_list.append(ds) except (pd.errors.EmptyDataError, IndexError): continue ds = ds_list[0] SP2_Dmin = ds['SP2_Dmin'].values SP2_Dgeo = ds['SP2_Dgeo'].values SP2_Dmax = ds['SP2_Dmax'].values SP2_date = ds['SP2_date'].values SP2_time = ds['SP2_time'].values for i in range(1, len(ds_list)): ds = ds + ds_list[i] if num_bins is None: num_bins = int(np.argwhere(np.isnan(SP2_Dmin))[0]) ds['SP2_date'].replace('', np.nan, inplace=True) ds.dropna(subset=['SP2_date'], inplace=True) ds['SP2_Dmin'] = SP2_Dmin ds['SP2_Dgeo'] = SP2_Dgeo ds['SP2_Dmax'] = SP2_Dmax ds['SP2_date'] = SP2_date ds['SP2_time'] = SP2_time return ds
[docs] def read_calibration(directory): """ This reads data from a bead calibration from the SP2. Each dataset is stored in a dictionary whose keys correspond to a given scattering or incadescence diameter in nm. Parameters ---------- directory: str The path to the calibration data. The directory must contain processed .dat files for each segment as well as .txt files that describe what diameter each .dat file corresponds to. Returns ------- my_dat: dict A dictionary storing the dataset for each scattering/incadescence diameter. """ file_list = glob(os.path.join(directory, '*')) # Look for dataset date for f in file_list: if platform.system() == "Windows": file_name = f.split("\\")[-1] else: file_name = f.split("/")[-1] date_str = file_name[0:8] if date_str.isnumeric(): dt = datetime.strptime(date_str, '%Y%m%d') break scat_ds = pd.read_csv(os.path.join(directory, '%sExptDetail_Scat.txt' % date_str), sep='\t') in_ds = pd.read_csv(os.path.join(directory, '%sExptDetail_Aq.txt' % date_str), sep='\t') scat_diam = scat_ds.Diameter.values incan_diam = in_ds.Diameter.values calibration_data = {} for i in range(len(scat_diam)): my_ds = [] for j in range(scat_ds["FileStart"][i], scat_ds["FileEnd"][i]+1): my_ds.append(read_dat(os.path.join(directory, '%sx%03d.dat' % (date_str, j)), type='particle')) calibration_data["scat_%d" % scat_diam[i]] = xr.concat(my_ds, dim='index').sortby('DateTimeWave') for i in range(len(incan_diam)): my_ds = [] for j in range(in_ds["FileStart"][i], in_ds["FileEnd"][i]+1): my_ds.append(read_dat(os.path.join(directory, '%sx%03d.dat' % (date_str, j)), type='particle')) calibration_data["incan_%d" % incan_diam[i]] = xr.concat(my_ds, dim='index').sortby('DateTimeWave') del scat_ds del in_ds return calibration_data