Source code for watershed_workflow.sources.manager_aorc

"""Manager for interacting with AORC datasets."""
from typing import Tuple, List, Optional

import os
import numpy as np
import xarray as xr
import shapely
import cftime, datetime
import logging
import s3fs
import attr

import watershed_workflow.crs
from watershed_workflow.crs import CRS

from . import manager_raster
from . import filenames
from . import manager_dataset


[docs] class ManagerAORC(manager_dataset.ManagerDataset): """AORC dataset. Explore the Analysis Of Record for Calibration (AORC) version 1.1 data https://registry.opendata.aws/noaa-nws-aorc/ Using Xarray, Dask and hvPlot to explore the AORC version 1.1 data. We read from a cloud-optimized Zarr dataset that is part of the NOAA Open Data Dissemination (NODD) program and we use a Dask cluster to parallelize the computation and reading of data chunks. AORC variables available to use: - APCP_surface - DLWRF_surface - DSWRF_surface - PRES_surface - SPFH_2maboveground - TMP_2maboveground - UGRD_10maboveground - VGRD_10maboveground There are eight variables representing the meteorological conditions - Total Precipitaion (APCP_surface): Hourly total precipitation (kgm-2 or mm) for Calibration (AORC) dataset - Air Temperature (TMP_2maboveground): Temperature (at 2 m above-ground-level (AGL)) (K) - Specific Humidity (SPFH_2maboveground): Specific humidity (at 2 m AGL) (g g-1) - Downward Long-Wave Radiation Flux (DLWRF_surface): (1) longwave (infrared) and (2) radiation flux (at the surface) (W m-2) - Downward Short-Wave Radiation Flux (DSWRF_surface): (1) Downward shortwave (solar) and (2) radiation flux (at the surface) (W m-2) - Pressure (PRES_surface): Air pressure (at the surface) (Pa) - U-Component of Wind (UGRD_10maboveground): U (west-east) - components of the wind (at 10 m AGL) (m s-1) - V-Component of Wind (VGRD_10maboveground): V (south-north) - components of the wind (at 10 m AGL) (m s-1) **Precipitation and Temperature** The gridded AORC precipitation dataset contains one-hour Accumulated Surface Precipitation (APCP) ending at the “top” of each hour, in liquid water-equivalent units (kg m-2 to the nearest 0.1 kg m-2), while the gridded AORC temperature dataset is comprised of instantaneous, 2 m above-ground-level (AGL) temperatures at the top of each hour (in Kelvin, to the nearest 0.1). **Specific Humidity, Pressure, Downward Radiation, Wind** The development process for the six additional dataset components of the Conus AORC [i.e., specific humidity at 2m above ground (kg kg-1); downward longwave and shortwave radiation fluxes at the surface (W m-2); terrain-level pressure (Pa); and west-east and south-north wind components at 10 m above ground (m s-1)] has two distinct periods, based on datasets and methodology applied: 1979–2015 and 2016–present. """
[docs] class Request(manager_dataset.ManagerDataset.Request): """AORC-specific request that includes filename for cached data.""" def __init__(self, request : manager_dataset.ManagerDataset.Request, filename : str = ''): super().copyFromExisting(request) self.filename = filename
# AORC constants VALID_VARIABLES = ['APCP_surface', 'DLWRF_surface', 'DSWRF_surface', 'PRES_surface', 'SPFH_2maboveground', 'TMP_2maboveground', 'UGRD_10maboveground', 'VGRD_10maboveground'] DEFAULT_VARIABLES = ['APCP_surface', 'DLWRF_surface', 'DSWRF_surface', 'PRES_surface', 'SPFH_2maboveground', 'TMP_2maboveground', 'UGRD_10maboveground', 'VGRD_10maboveground'] URL = 's3://noaa-nws-aorc-v1-1-1km' def __init__(self): # AORC native data properties native_start = cftime.datetime(2007, 1, 1, calendar='standard') native_end = cftime.datetime(2024, 12, 31, calendar='standard') native_crs = CRS.from_epsg(4326) # WGS84 Geographic native_resolution = 0.00833333 # ~1km in degrees (approximately 1km at mid-latitudes) # Initialize base class with correct parameter order super().__init__( name='AORC v1.1', source='NOAA AWS S3 Zarr', native_resolution=native_resolution, native_crs_in=native_crs, native_crs_out=native_crs, native_start=native_start, native_end=native_end, valid_variables=self.VALID_VARIABLES, default_variables=self.DEFAULT_VARIABLES ) # File naming for cached downloads self.names = filenames.Names(self.name, 'meteorology', 'aorc', 'aorc_{start_year}-{end_year}_{north}x{west}_{south}x{east}.nc') # Check directory structure os.makedirs(self.names.folder_name(), exist_ok=True) def _cleanBounds(self, geometry : shapely.geometry.Polygon) -> list[float]: """Extract bounds from geometry already in native CRS and buffered by base class.""" bounds = geometry.bounds return [np.round(b, 4) for b in bounds] def _download(self, geometry : shapely.geometry.Polygon, start_year : int, end_year : int, force : bool = False) -> str: """Download AORC data for geometry and time range from S3 Zarr. Parameters ---------- geometry : shapely.geometry.Polygon Geometry in native CRS already transformed and buffered by base class. start_year : int Starting year for data download. end_year : int Ending year for data download. force : bool, optional If true, re-download even if a file already exists. Returns ------- str The filename of the cached dataset. """ # check directory structure os.makedirs(self.names.folder_name(), exist_ok=True) dataset_years = list(range(start_year, end_year+1)) # Get bounds from geometry (already in native CRS and buffered) bounds = self._cleanBounds(geometry) # get the subset filename filename = self.names.file_name(start_year=start_year, end_year=end_year, east=bounds[0], south=bounds[1], west=bounds[2], north=bounds[3]) if (not os.path.exists(filename)) or force: s3_out = s3fs.S3FileSystem(anon=True) fileset = [s3fs.S3Map( root=f"{self.URL}/{dataset_year}.zarr", s3=s3_out, check=False ) for dataset_year in dataset_years] # dask.config.set(temporary_directory=self.names.folder_name()) # client = dask.distributed.Client() ds_multi_year = xr.open_mfdataset(fileset, engine='zarr') print(f'Variable size: {ds_multi_year.nbytes/1e12:.1f} TB') print(ds_multi_year) # Subset the dataset to the bounding box print('Subsetting:') print(f' lon: {bounds[0], bounds[2]}') print(f' lat: {bounds[1], bounds[3]}') ds_subset = ds_multi_year.sel(longitude=slice(bounds[0], bounds[2]), latitude=slice(bounds[1], bounds[3])) print(f'Variable size: {ds_subset.nbytes/1e9:.3f} GB') ds_subset = ds_subset.compute() ds_subset.to_netcdf(filename) # client.close() else: print(f" Using existing: {filename}") return filename def _requestDataset(self, request: manager_dataset.ManagerDataset.Request ) -> manager_dataset.ManagerDataset.Request: """Request AORC data - ready upon download completion. Parameters ---------- request : ManagerDataset.Request Request object containing geometry, dates, and variables. Returns ------- ManagerDataset.Request New AORC request object with filename and is_ready flag set. """ assert request.start is not None assert request.end is not None assert request.variables is not None # Convert dates to years start_year = request.start.year end_year = request.end.year if start_year > end_year: raise RuntimeError( f"Provided start year {start_year} is after provided end year {end_year}") # Download data to cache (this may take time) filename = self._download(request.geometry, start_year, end_year, force=False) # Create new AORC-specific request with filename aorc_request = self.Request(request, filename) aorc_request.is_ready = True return aorc_request def _fetchDataset(self, request: manager_dataset.ManagerDataset.Request) -> xr.Dataset: """Implementation of abstract method to fetch AORC data. Parameters ---------- request : ManagerDataset.Request Request object containing cached data reference. Returns ------- xr.Dataset Dataset containing the requested AORC data. """ # Open cached dataset dataset = xr.open_dataset(request.filename) # Filter to requested variables only if request.variables != self.valid_variables: # Only keep requested variables dataset = dataset[request.variables] # Ensure CRS is properly set if hasattr(dataset, 'rio') and dataset.rio.crs is None: dataset = dataset.rio.write_crs(self.native_crs_out) return dataset