Source code for pyart.io.common

"""
Input/output routines common to many file formats.

"""

import bz2
import gzip

import fsspec
import netCDF4
import numpy as np


[docs] def prepare_for_read(filename, storage_options={"anon": True}): """ Return a file like object read for reading. Open a file for reading in binary mode with transparent decompression of Gzip and BZip2 files. The resulting file-like object should be closed. Parameters ---------- filename : str or file-like object Filename or file-like object which will be opened. File-like objects will not be examined for compressed data. storage_options : dict, optional Parameters passed to the backend file-system such as Google Cloud Storage, Amazon Web Service S3. Returns ------- file_like : file-like object File like object from which data can be read. """ # if a file-like object was provided, return if hasattr(filename, "read"): # file-like object return filename # look for compressed data by examining the first few bytes fh = fsspec.open(filename, mode="rb", compression="infer", **storage_options).open() magic = fh.read(3) fh.close() # If the data is still compressed, use gunzip/bz2 to uncompress the data if magic.startswith(b"\x1f\x8b"): return gzip.GzipFile(filename, "rb") if magic.startswith(b"BZh"): return bz2.BZ2File(filename, "rb") return fsspec.open( filename, mode="rb", compression="infer", **storage_options ).open()
def stringarray_to_chararray(arr, numchars=None): """ Convert a string array to a character array with one extra dimension. Implementation that falls back to pure-numpy conversion if netCDF4.stringtochar is unavailable or fails. Parameters ---------- arr : array-like String or bytes array numchars : int, optional Fixed character width. Must be >= actual max string length. Returns ------- ndarray Character array with dtype 'S1' and shape (*arr.shape, numchars) """ arr = np.asarray(arr) # Handle scalar scalar = arr.ndim == 0 if scalar: arr = arr.reshape((1,)) # Handle masked arrays if np.ma.isMaskedArray(arr): arr = arr.filled("") # Try netCDF4 first carr = None try: carr = netCDF4.stringtochar(arr) except (ImportError, AttributeError, Exception): pass # Fall through to manual conversion # Manual fallback if carr is None: carr = _manual_string_to_char(arr, numchars) # Validate and pad if numchars specified if numchars is not None: arr_numchars = carr.shape[-1] if numchars < arr_numchars: raise ValueError( f"numchars ({numchars}) must be >= actual width ({arr_numchars})" ) if numchars > arr_numchars: out = np.zeros(arr.shape + (numchars,), dtype="S1") out[..., :arr_numchars] = carr carr = out # Restore scalar shape if scalar: carr = carr[0] return carr def _manual_string_to_char(arr, numchars=None): """Manual string-to-char conversion.""" # Handle empty arrays if arr.size == 0: width = numchars if numchars is not None else 1 return np.zeros(arr.shape + (width,), dtype="S1") # Encode to bytes flat = arr.ravel() encoded = [] for x in flat: if x is None or x == "": encoded.append(b"") elif isinstance(x, bytes): encoded.append(x) else: encoded.append(str(x).encode("utf-8")) # Determine width max_bytes = max(len(b) for b in encoded) width = numchars if numchars is not None else max(max_bytes, 1) # Allocate and fill chararr = np.zeros(arr.shape + (width,), dtype="S1") for idx, b in zip(np.ndindex(arr.shape), encoded): if len(b) > width: b = b[:width] # Truncate # Use null padding for netCDF compatibility padded = b + b"\x00" * (width - len(b)) chararr[idx] = np.frombuffer(padded, dtype="S1", count=width) return chararr def _test_arguments(dic): """Issue a warning if receive non-empty argument dict.""" if dic: import warnings warnings.warn(f"Unexpected arguments: {dic.keys()}") def make_time_unit_str(dtobj): """Return a time unit string from a datetime object.""" return "seconds since " + dtobj.strftime("%Y-%m-%dT%H:%M:%SZ")