Source code for act.utils.io_utils

import act
import numpy as np
import pandas as pd
from pathlib import Path
import tarfile
from os import sep
from os import PathLike
from shutil import rmtree
import gzip
import shutil
import tempfile
import types

try:
    import moviepy.editor as moviepy_editor
    import moviepy.video.io.ImageSequenceClip

    MOVIEPY_AVAILABLE = True
except (ImportError, RuntimeError):
    MOVIEPY_AVAILABLE = False


[docs]def pack_tar(filenames, write_filename=None, write_directory=None, remove=False): """ Creates TAR file from list of filenames provided. Currently only works with all files existing in the same directory. Parameters ---------- filenames : str or list Filenames to be placed in TAR file write_filename : str, pathlib.Path, None TAR output filename. If not provided will use file name 'created_tarfile.tar' write_directory : str, pathlib.Path, None Path to directory to write TAR file. If the directory does not exist will be created. remove : boolean Delete provided filenames after making TAR file Returns ------- list List of files extracted from the TAR file or full path to created direcotry containing extracted files. """ if write_filename is None: write_filename = 'created_tarfile.tar' if isinstance(filenames, (str, PathLike)): filenames = [filenames] if write_directory is not None: write_directory = Path(write_directory) write_directory.mkdir(parents=True, exist_ok=True) write_filename = Path(write_filename).name elif Path(write_filename).parent != Path('.'): write_directory = Path(write_filename).parent else: write_directory = Path('.') if not str(write_filename).endswith('.tar'): write_filename = str(write_filename) + '.tar' write_filename = Path(write_directory, write_filename) tar_file_handle = tarfile.open(write_filename, "w") for filename in filenames: tar_file_handle.add(filename, arcname=Path(filename).name) tar_file_handle.close() if remove: for filename in filenames: Path(filename).unlink() return str(write_filename)
[docs]def unpack_tar( tar_files, write_directory=None, temp_dir=False, randomize=True, return_files=True, remove=False ): """ Unpacks TAR file contents into provided base directory Parameters ---------- tar_files : str or list path to TAR file to be unpacked write_directory : str or pathlib.Path base path to extract contents of TAR files or create a new randomized directory to extract contents of TAR file. temp_dir : boolean Should a temporary directory be created and TAR files extracted to the new directory. write_directory and randomize are ignored if this option is used. randomize : boolean Create a new randomized directory to extract TAR files into. return_files : boolean When set will return a list of full path filenames to the extracted files. When set to False will return full path to directory containing extracted files. remove : boolean Delete provided TAR files after extracting files. Returns ------- files : list or str List of full path files extracted from the TAR file or full path to direcotry containing extracted files. """ files = [] if isinstance(tar_files, (str, PathLike)): tar_files = [tar_files] out_dir = Path.cwd() if temp_dir is True: out_dir = Path(tempfile.TemporaryDirectory().name) else: if write_directory is not None: out_dir = Path(write_directory) else: out_dir = Path(Path(tar_files[0]).parent) if out_dir.is_dir() is False: out_dir.mkdir(parents=True, exist_ok=True) if randomize: out_dir = Path(tempfile.mkdtemp(dir=out_dir)) for tar_file in tar_files: try: tar = tarfile.open(tar_file) tar.extractall(path=out_dir) result = [str(Path(out_dir, ii.name)) for ii in tar.getmembers()] files.extend(result) tar.close() except tarfile.ReadError: print("Could not extract files from the tar_file") if return_files is False: files = str(out_dir) else: files.sort() if remove: for tar_file in tar_files: Path(tar_file).unlink() return files
[docs]def cleanup_files(dirname=None, files=None): """ Cleans up files and directory possibly created from unpacking TAR files with unpack_tar() Parameters ---------- dirname : str, pathlib.Path, None Path to directory of extracted files which will be removed. files : str, pahtlib.Path, list, None Full path file name(s) from extracted TAR file. Assumes the directory this file exists in should be removed. """ if isinstance(files, (str, PathLike)): files = [str(files)] try: if dirname is not None: rmtree(dirname) if files is not None and len(files) > 0 and Path(files[0]).is_file(): out_dir = Path(files[0]).parent rmtree(out_dir) except Exception as error: print("\nError removing files:", error)
[docs]def is_gunzip_file(filepath): """ Function to test if file is a gunzip file. Parameters ---------- filepath : str or pathlib.Path to file to test Returns ------- test : boolean Result from testing if file is a gunzip file """ try: with open(str(filepath), 'rb') as test_f: return test_f.read(2) == b'\x1f\x8b' except Exception: return False
[docs]def pack_gzip(filename, write_directory=None, remove=False): """ Creates a gunzip file from a filename path Parameters ---------- filename : str, pathlib.Path Filename to use in creation of gunzip version. write_directory : str, pahtlib.Path, list, None Path to directory to place newly created gunzip file. remove : boolean Remove provided filename after creating gunzip file Returns ------- write_filename : str Full path name of created gunzip file """ write_filename = Path(filename).name + '.gz' if write_directory is not None: write_filename = Path(write_directory, write_filename) Path(write_directory).mkdir(parents=True, exist_ok=True) else: write_filename = Path(Path(filename).parent, write_filename) with open(filename, 'rb') as f_in: with gzip.open(write_filename, 'wb') as f_out: shutil.copyfileobj(f_in, f_out) if remove: Path(filename).unlink() return str(write_filename)
[docs]def unpack_gzip(filename, write_directory=None, remove=False): """ Extracts file from a gunzip file. Parameters ---------- filename : str, pathlib.Path Filename to use in extraction of gunzip file. write_directory : str, pahtlib.Path, list, None Path to directory to place newly created gunzip file. remove : boolean Remove provided filename after creating gunzip file Returns ------- write_filename : str Full path name of created gunzip file """ if write_directory is None: write_directory = Path(filename).parent write_filename = Path(filename).name if write_filename.endswith('.gz'): write_filename = write_filename.replace(".gz", "") write_filename = Path(write_directory, write_filename) with gzip.open(filename, "rb") as f_in: with open(write_filename, "wb") as f_out: shutil.copyfileobj(f_in, f_out) if remove: Path(filename).unlink() return str(write_filename)
[docs]def generate_movie(images, write_filename=None, fps=10, **kwargs): """ Creates a movie from a list of images or convert movie to different type Parameters ---------- images : list, PosixPath generator, path to a directory, single string/PosixPath to movie List of images in the correct order to make into a movie or a generator from a pathlib.Path.glob() search. If a path to directory will create movie from all files in that directory in alpanumeric order. If a single file to a movie will allow for converting to new format defined by file extension of write_filename. write_filename : str, pathlib.Path, None Movie output filename. Default is 'movie.mp4' in current directory. If a path to a directory that does not exist, will create the directory path. fps: int Frames per second. Passed into moviepy->ImageSequenceClip() method **kwargs: dict Optional keywords passed into moviepy->write_videofile() method Returns ------- write_filename : str Full path name of created movie file """ if not MOVIEPY_AVAILABLE: raise ImportError('MoviePy needs to be installed on your system to make movies.') # Set default movie name if write_filename is None: write_filename = Path(Path().cwd(), 'movie.mp4') # Check if images is pointing to a directory. If so ensure is a string not PosixPath IS_MOVIE = False if isinstance(images, (types.GeneratorType, list, tuple)): images = [str(image) for image in images] images.sort() elif isinstance(images, (PathLike, str)) and Path(images).is_file(): IS_MOVIE = True images = str(images) elif isinstance(images, (PathLike, str)) and Path(images).is_dir(): images = str(images) # Ensure full path to filename exists write_directory = Path(write_filename).parent write_directory.mkdir(parents=True, exist_ok=True) if IS_MOVIE: with moviepy_editor.VideoFileClip(images) as clip: # There can be an issue converting mpeg to other movie format because the # duration parameter in the movie file is not set. So moviepy guesses and # can get the duration wrong. This will find the correct duration (correct to 0.2 seconds) # and set before writing. if Path(images).suffix == '.mpg': import numpy as np import warnings from collections import deque with warnings.catch_warnings(): warnings.filterwarnings('ignore', category=UserWarning) desired_len = 3 frame_sums = deque() duration = 0.0 # Duration of movie in seconds while True: result = clip.get_frame(duration) frame_sums.append(np.sum(result)) if len(frame_sums) > desired_len: frame_sums.popleft() if len(set(frame_sums)) == 1: break duration += 0.1 clip = clip.set_start(0) clip = clip.set_duration(duration) clip = clip.set_end(duration) clip.write_videofile(str(write_filename), **kwargs) else: clip.write_videofile(str(write_filename), **kwargs) else: clip = moviepy.video.io.ImageSequenceClip.ImageSequenceClip(images, fps=fps) clip.write_videofile(str(write_filename), **kwargs) return str(write_filename)
[docs]def arm_standards_validator(file=None, dataset=None, verbose=True): """ ARM Data Validator (ADV) - Checks to ensure that ARM standards are being followed in the files or dataset passed to it. Note, this includes a minimal set of standards that it checks against Parameters ---------- file : str Filename to check against ARM standards. Do not pass in both a file and dataset dataset : xarray.DataSet Xarray dataset of an already read in file. verbose : boolean Defaults to print out errors in addition to returning a list of them Returns ------- err : list List of errors in the data """ # Set up the error tracking list err = [] if file is not None and isinstance(file, str): # Check file naming standards if len(file.split(sep)[-1]) > 60.0: err.append('Filename length exceeds 60 characters') try: f_obj = act.utils.data_utils.DatastreamParserARM(file) except Exception as e: print(e) if ( (f_obj.site is None) or (f_obj.datastream_class is None) or (f_obj.level is None) or (f_obj.facility is None) or (f_obj.date is None) or (f_obj.time is None) or (f_obj.ext is None) ): err.append( 'Filename does not follow the normal ARM convention: ' + '(sss)(inst)(qualifier)(temporal)(Fn).(dl).(yyyymmdd).(hhmmss).nc' ) else: if f_obj.level[0] not in ['0', 'a', 'b', 'c', 's', 'm']: err.append(f_obj.level + ' is not a standard ARM data level') results = act.utils.arm_site_location_search( site_code=f_obj.site, facility_code=f_obj.facility ) if len(results) == 0: err.append('Site and facility are not ARM standard') # The ability to read a file from NetCDF to xarray will catch a lot of the # problems with formatting. This would leave standard ARM checks try: if dataset is None and file is not None: ds = act.io.read_arm_netcdf(file) elif dataset is not None: ds = dataset else: raise ValueError('File and dataset are both None') except Exception as e: return ['File is not in a standard format that is readable by xarray: ' + str(e)] # Review time variables for errors for conformance to standards if 'time' not in list(ds.dims)[0]: err.append('"time" is required to be the first dimension.') for c in list(ds.coords): if c not in ds.dims: err.append(c + ': Coordinate is not included in dimensions.') if any(np.isnan(ds['time'].values)): err.append('Time must not include NaNs.') duplicates = sum(ds['time'].to_pandas().duplicated()) if duplicates > 0: err.append('Duplicate times present in the file') diff = ds['time'].diff('time') idx = np.where(diff <= pd.Timedelta(0)) if len(idx[0]) > 0: err.append('Time is not in increasing order') if 'base_time' not in ds or 'time_offset' not in ds: err.append('ARM requires base_time and time_offset variables.') # Check to make sure other coordinate variables don't have nans # Also check to make sure coordinate variables are not decreasing if len(list(ds.coords)) > 1: for d in ds.coords: if d == 'time': continue if any(np.isnan(ds[d].values)): err.append('Coordinates must not include NaNs ' + d) diff = ds[d].diff(d) idx = np.where(diff <= 0.0) if len(idx[0]) > 0: err.append(d + ' is not in increasing order') if 'missing_value' in ds[d].encoding: err.append(d + ' should not include missing value') # Verify that each variable has a long_name and units attributes for v in ds: if (len(ds[v].dims) > 0) and ('time' not in list(ds[v].dims)[0]) and ('bounds' not in v): err.append(v + ': "time" is required to be the first dimension.') if (ds[v].size == 1) and (len(ds[v].dims) > 0): err.append(v + ': is not defined as a scalar.') if 'long_name' not in ds[v].attrs: err.append('Required attribute long_name not in ' + v) else: if not ds[v].attrs['long_name'][0].isupper(): err.append(v + ' long_name attribute does not start with uppercase') if ( ('qc_' not in v) and (v not in ['time', 'time_offset', 'base_time', 'lat', 'lon', 'alt']) and ('bounds' not in v) ): if ('missing_value' not in ds[v].encoding) and ('FillValue' not in ds[v].encoding): err.append(v + ' does not include missing_value or FillValue attributes') # QC variable checks if 'qc_' in v: if v[3:] not in ds: err.append('QC variable does not have a corresponding variable ' + v[3:]) if 'ancillary_variables' not in ds[v[3:]].attrs: err.append( v[3:] + ' does not include ancillary_variable attribute pointing to ' + v ) if 'description' not in ds[v].attrs: err.append(v + ' does not include description attribute') if 'flag_method' not in ds[v].attrs: err.append(v + ' does not include flag_method attribute') if (v not in ['base_time', 'time_offset']) and ('bounds' not in v): if 'units' not in ds[v].attrs: err.append('Required attribute units not in ' + v) # Lat/Lon/Alt Checks if 'lat' not in ds: err.append('ARM requires the latitude variable to be named lat') else: if 'standard_name' in ds['lat'].attrs: if ds['lat'].attrs['standard_name'] != 'latitude': err.append('ARM requires the lat standard_name to be latitude') else: err.append('"lat" variable does not have a standard_name attribute') if 'lon' not in ds: err.append('ARM requires the longitude variable to be named lon') else: if 'standard_name' in ds['lon'].attrs: if ds['lon'].attrs['standard_name'] != 'longitude': err.append('ARM requires the lon standard_name to be longitude') else: err.append('"long" variable does not have a standard_name attribute') if 'alt' not in ds: err.append('ARM requires the altitude variable to be named alt') else: if 'standard_name' in ds['alt'].attrs: if ds['alt'].attrs['standard_name'] != 'altitude': err.append('ARM requires the alt standard_name to be altitude') else: err.append('"alt" variable does not have a standard_name attribute') # Required global attributes req_att = ['doi', 'sampling_interval', 'averaging_interval'] for ra in req_att: if ra not in ds.attrs: err.append('Global attribute is missing: ' + ra) if verbose: if len(err) > 0: [print(e) for e in err] else: print('File is passing standards checks') return err