Source code for act.discovery.ameriflux

"""
Script for downloading data from Ameriflux's Data Webservice

"""

import os
import requests
import warnings

import pandas as pd

warnings.simplefilter('always')


[docs]def download_ameriflux_data( user_id, user_email, site_ids, data_product="FLUXNET", data_policy=None, data_variant="FULLSET", agree_policy=True, intended_use=None, description=None, out_dir=None, **kwargs, ): """ This tool will allows users to download Ameriflux data. This code is based on the original R code found here: https://github.com/chuhousen/amerifluxr Parameters ---------- user_id : str The user's Ameriflux user ID. user_email : str The user's email address. site_ids : list A list of valid site_ids to download data from. List of available sites can be found here: https://ameriflux.lbl.gov/sites/site-search/ data_product : str Data product type. Options are BASE-BADM, BIF, or FLUXNET. Default is FLUXNET. For more on data products: https://ameriflux.lbl.gov/data/flux-data-products/ https://ameriflux.lbl.gov/data/badm/ data_policy : str Data policy under which data has been licensed by the PI. Options are CCBY4.0 or LEGACY. data_variant : str or None Variant used for FLUXNET data product. Options are SUBSET, FULLSET, or None. Default is FUllSET agree_policy : bool Acknowledge you read and agree to the AmeriFlux Data use policy. Data policy can be found here: https://ameriflux.lbl.gov/data/data-policy/ intended_use : str Planned use for data downloaded. Select best match. Shorter code options are available' and when provided will be used to provide full intended use, examples of user codes and what they correlate to: "synthesis": "Research - Multi-site synthesis", "remote_sensing": "Research - Remote sensing", "model": "Research - Land model/Earth system model", "other_research": "Research - Other", "education": "Education (Teacher or Student)", "other": "Other" description : str Brief description of intended use. This will be recorded in the data download log and emailed to site’s PI. out_dir : str The output directory for the data. Set to None to make a folder in the current working directory with the same name as *datastream* to place the files in. Notes ----- This programmatic interface allows users to query and automate machine-to-machine downloads of Ameriflux data. This tool uses a REST URL and specific parameters mentioned above and data files matching the criteria will be returned to the user and downloaded. To login/register for an Ameriflux account: https://ameriflux-data.lbl.gov/Pages/RequestAccount.aspx Examples -------- This code will download a zip file for BASE-BADM data product at site US-A37. See the Notes for information on how to obtain a username and token. .. code-block:: python act.discovery.download_ameriflux_data( user_id, user_email, data_product="BASE-BADM", data_policy="CCBY4.0", data_variant="FULLSET", site_ids=["US-A37"], agree_policy=True, intended_use="synthesis", description="I intend to use this data for research", out_dir="/home/user/ameriflux_data/", ) Returns ------- files : list Returns list of files retrieved """ # Check all inputs are valid if not isinstance(user_id, str): raise ValueError("user_id should be a string...") if not isinstance(user_email, str) or "@" not in user_email: raise ValueError("user_email not a valid email...") # Check if site_id are valid site IDs check_id = _check_site_id(site_ids) if isinstance(site_ids, list): if any(not valid for valid in check_id): warnings.warn( f"{', '.join([site_ids[i] for i, valid in enumerate(check_id) if not valid])} not valid AmeriFlux Site ID", UserWarning, ) site_ids = [site_ids[i] for i, valid in enumerate(check_id) if valid] elif isinstance(site_ids, str): if not (check_id or site_ids in ["AA-Flx", "AA-Net"]): site_ids = None if not site_ids: raise ValueError("No valid Site ID in site_ids...") # Obtain formal intended use category def intended_use_extended(intended_use): return { "synthesis": "Research - Multi-site synthesis", "remote_sensing": "Research - Remote sensing", "model": "Research - Land model/Earth system model", "other_research": "Research - Other", "education": "Education (Teacher or Student)", "other": "Other", }.get(intended_use) if not intended_use_extended(intended_use): raise ValueError("Invalid intended_use input...") # Check if out_dir is reachable if out_dir is None: os.makedirs(os.getcwd() + '/data/') out_dir = os.getcwd() + '/data/' else: if not os.path.isdir(out_dir): os.makedirs(out_dir) # Prompt for data policy agreement if data_policy == "CCBY4.0": warnings.warn( "\n" "Data use guidelines for AmeriFlux CC-BY-4.0 Data Policy:\n" "(1) Data user is free to Share (copy and redistribute the material in any medium or format) and/or Adapt (remix, transform, and build upon the material) for any purpose.\n" "(2) Provide a citation to each site data product that includes the data-product DOI and/or recommended publication.\n" "(3) Acknowledge funding for supporting AmeriFlux data portal: U.S. Department of Energy Office of Science.\n" "\n", PolicyWarning, ) elif data_policy == "LEGACY": warnings.warn( "\n" "Data use guidelines for AmeriFlux LEGACY License:\n" "(1) When you start in-depth analysis that may result in a publication, contact the data contributors directly, so that they have the opportunity to contribute substantively and become a co-author.\n" "(2) Provide a citation to each site data product that includes the data-product DOI.\n" "(3) Acknowledge funding for site support if it was provided in the data download information.\n" "(4) Acknowledge funding for supporting AmeriFlux data portal: U.S. Department of Energy Office of Science.\n" "\n", PolicyWarning, ) else: raise ValueError("Specify a valid data policy before proceeding...") if not agree_policy: raise ValueError("Acknowledge data policy before proceeding...") if "is_test" in kwargs: if kwargs['is_test'] is True: test_key = "true" else: test_key = "" else: test_key = "" # Payload for download web service params = { "user_id": user_id, "user_email": user_email, "data_policy": data_policy, "data_product": data_product, "data_variant": data_variant, "site_ids": site_ids, "intended_use": intended_use_extended(intended_use), "description": f"{description} [Atmospheric data Community Toolkit download]", "is_test": test_key, } result = requests.post( _ameriflux_endpoints("data_download"), json=params, headers={"Content-Type": "application/json"}, ) # Check if FTP returns correctly if result.status_code == 200: link = result.json() ftplink = [data_url['url'] for data_url in link.get('data_urls', [])] # Check if any site_id has no data if not ftplink: raise ValueError(f"Cannot find data from {site_ids}") # Avoid downloading fluxnet_bif for now if ( isinstance(site_ids, str) and site_ids == "AA-Flx" and data_policy == "CCBY4.0" and len(ftplink) > 1 ): ftplink = [url for url in ftplink if "FLUXNET-BIF" not in url] # Get zip file names outfname = [os.path.basename(url).split("?")[0] for url in ftplink] # Check if any site_ids has no data if len(outfname) < len(site_ids): miss_site_id = [ sid for sid in site_ids if sid not in [fname[4:10] for fname in outfname] ] warnings.warn(f"Cannot find data from {miss_site_id}") # Download sequentially output_zip_file = [os.path.join(out_dir, fname) for fname in outfname] for i, url in enumerate(ftplink): response = requests.get(url, stream=True) with open(output_zip_file[i], 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) # Check if downloaded files exist miss_download = [i for i, file in enumerate(output_zip_file) if not os.path.exists(file)] if miss_download: warnings.warn( f"Cannot download {[output_zip_file[i] for i in miss_download]} from {[ftplink[i] for i in miss_download]}" ) else: raise ValueError("Data download fails, timeout or server error...") return output_zip_file
# Return AmeriFlux server endpoints def _ameriflux_endpoints(endpoint="sitemap"): """Retrieves urls for different ameriflux server endpoints. Options include" sitemap, site_ccby4, data_year, data_download, and variables""" # base urls base_url = "https://amfcdn.lbl.gov/" api_url = os.path.join(base_url, "api/v1") # what to return url = { "sitemap": os.path.join(api_url, "site_display/AmeriFlux"), "site_ccby4": os.path.join(api_url, "site_availability/AmeriFlux/BIF/CCBY4.0"), "data_download": os.path.join(api_url, "data_download"), }.get(endpoint) return url def _check_site_id(x): """Checks if user provided site_ids are valid.""" response = requests.get(_ameriflux_endpoints("sitemap")) df = pd.json_normalize(response.json()) site_ids = df['SITE_ID'].tolist() chk_id = [site_id in site_ids for site_id in x] return chk_id class PolicyWarning(UserWarning): pass