"""Manager for interacting with DayMet datasets."""
from typing import List, Optional
import attr
import os, sys
import logging
import numpy as np
import requests
import requests.exceptions
import shapely.geometry
import cftime, datetime
import xarray as xr
import watershed_workflow.crs
from watershed_workflow.crs import CRS
import watershed_workflow.warp
from . import utils as source_utils
from . import filenames
from . import manager_dataset
[docs]
class ManagerDaymet(manager_dataset.ManagerDataset):
"""Daymet meterological datasets.
Daymet is a historic, spatially interpolated product which ingests large
number of point-sources of meterological data, aggregates them to daily
time series, and spatially interpolates them onto a 1km gridded product that
covers all of North America from 1980 to present [Daymet]_.
Variable names and descriptions
.. list-table::
:widths: 25 25 75
:header-rows: 1
* - name
- units
- description
* - prcp
- :math:`mm / day`
- Total daily precipitation
* - tmin, tmax
- :math:`^\\circ C`
- Min/max daily air temperature
* - srad
- :math:`W / m^2`
- Incoming solar radiation - per DAYLIT time!
* - vp
- :math:`Pa`
- Vapor pressure
* - swe
- :math:`Kg / m^2`
- Snow water equivalent
* - dayl
- :math:`s / day`
- Duration of sunlight
.. [Daymet] https://daymet.ornl.gov
"""
# DayMet-specific constants
URL = "http://thredds.daac.ornl.gov/thredds/ncss/grid/ornldaac/2129/daymet_v4_daily_na_{variable}_{year}.nc"
[docs]
@attr.define
class Request(manager_dataset.ManagerDataset.Request):
"""DayMet-specific request that adds download information."""
bounds: list = attr.field(default=None)
start_year: int = attr.field(default=None)
end_year: int = attr.field(default=None)
def __init__(self):
# DayMet native data properties
native_resolution = 1000.0 # 1km resolution in meters
native_start = cftime.datetime(1980, 1, 1, calendar='noleap')
native_end = cftime.datetime(2023, 12, 31, calendar='noleap')
native_crs_daymet = CRS.from_proj4(
'+proj=lcc +lat_1=25 +lat_2=60 +lat_0=42.5 +lon_0=-100 +x_0=0 +y_0=0 +ellps=WGS84 +units=m +no_defs'
)
valid_variables = ['tmin', 'tmax', 'prcp', 'srad', 'vp', 'swe', 'dayl']
default_variables = ['tmin', 'tmax', 'prcp', 'srad', 'vp', 'dayl']
# Initialize base class with native properties
super().__init__(
name='DayMet 1km',
source='ORNL DAAC THREDDS API',
native_resolution=native_resolution,
native_crs_in=CRS.from_epsg(4326), # Accept WGS84 input
native_crs_out=native_crs_daymet, # Return in DayMet LCC projection
native_start=native_start,
native_end=native_end,
valid_variables=valid_variables,
default_variables=default_variables
)
self.names = filenames.Names(
self.name, 'meteorology', 'daymet',
'daymet_{var}_{year}_{north}x{west}_{south}x{east}.nc')
def _download(self,
var : str,
year : int,
bounds_str : List[str],
filename : str,
force : bool = False) -> None:
"""Download a NetCDF file covering the bounds.
Parameters
----------
var : str
Name the variable, see table in the class documentation.
year : int
A year in the valid range (currently 1980-2023)
bounds_str : [xmin, ymin, xmax, ymax]
Desired bounds, in the LatLon CRS.
force : bool, optional
If true, re-download even if a file already exists.
filename : str
Resulting NetCDF file.
"""
logging.info("Collecting DayMet file to tile bounds: {}".format(bounds_str))
# check directory structure
os.makedirs(self.names.folder_name(), exist_ok=True)
if (not os.path.exists(filename)) or force:
url_dict = { 'year': str(year), 'variable': var }
url = self.URL.format(**url_dict)
logging.info(" Downloading: {}".format(url))
logging.info(" to file: {}".format(filename))
request_params = [('var', 'lat'), ('var', 'lon'), ('var', var), ('west', bounds_str[0]),
('south', bounds_str[1]), ('east', bounds_str[2]), ('north', bounds_str[3]),
('horizStride', '1'),
('time_start', '{}-01-01T12:00:00Z'.format(year)),
('time_end', '{}-12-31T12:00:00Z'.format(year)), ('timeStride', '1'),
('accept', 'netcdf')]
r = requests.get(url, params=request_params, verify=source_utils.getVerifyOption())
r.raise_for_status()
with open(filename, 'wb') as fid:
fid.write(r.content)
else:
logging.info(" Using existing: {}".format(filename))
def _cleanBounds(self, geometry : shapely.geometry.Polygon) -> list[float]:
"""Compute bounds in lat-lon CRS for DayMet API requests."""
# geometry is already in WGS84 (native_crs_in), so just get bounds
bounds_ll = geometry.bounds
return [np.round(b, 4) for b in bounds_ll]
def _requestDataset(self, request: manager_dataset.ManagerDataset.Request
) -> Request:
"""Request DayMet data - check if files exist or need downloading.
Parameters
----------
request : ManagerDataset.Request
Dataset request with preprocessed parameters.
Returns
-------
Request
DayMet-specific request with download info and readiness status.
"""
# Extract parameters from request
geometry = request.geometry
start = request.start
end = request.end
variables = request.variables
assert start is not None, "Start date is required for DayMet data"
assert end is not None, "End date is required for DayMet data"
assert variables is not None, "Variables are required for DayMet data"
start_year = start.year
end_year = (end - datetime.timedelta(days=1)).year
if start_year > end_year:
raise RuntimeError(f"Start year {start_year} is after end year {end_year}")
# Get bounds for API requests
bounds = self._cleanBounds(geometry)
# Create DayMet-specific request with download info
daymet_request = self.Request(
manager=request.manager,
is_ready=True,
geometry=request.geometry,
start=request.start,
end=request.end,
variables=request.variables,
bounds=bounds,
start_year=start_year,
end_year=end_year,
)
return daymet_request
def _openFiles(self,
filenames : List[List[str]],
variables : List[str]) -> xr.Dataset:
"""Open all files and concatenate them into a single xarray dataset."""
fnames_by_var = list(zip(filenames, variables))
ds_list_allvars = []
for info in fnames_by_var:
var = info[1]
fnames = info[0]
ds_list = []
for fname in fnames:
ds = xr.open_dataset(fname)
ds_list.append(ds)
ds_concat = xr.concat(ds_list, dim="time")
ds_list_allvars.append(ds_concat[var])
ds_combined = xr.Dataset({da.name: da for i, da in enumerate(ds_list_allvars)})
# convert the x and y coordinates from km to meters and update the attributes
attrs_ref = ds_combined.x.attrs
attrs_ref['units'] = 'm'
new_x = ds_combined.x * 1000
new_y = ds_combined.y * 1000
new_time = watershed_workflow.data.convertTimesToCFTimeNoleap(
watershed_workflow.data.convertTimesToCFTime(ds_combined['time'].values))
ds_combined = ds_combined.assign_coords(x=new_x, y=new_y, time=new_time)
ds_combined.x.attrs = attrs_ref
ds_combined.y.attrs = attrs_ref
# deal with attrs
ds_combined.attrs = ds_concat.attrs
return ds_combined
def _fetchDataset(self, request: Request) -> xr.Dataset:
"""Fetch DayMet data for the request.
Parameters
----------
request : Request
DayMet-specific request with preprocessed parameters and download info.
Returns
-------
xr.Dataset
Dataset containing the requested DayMet data.
"""
# Extract parameters from DayMet request
variables = request.variables
bounds = request.bounds
start_year = request.start_year
end_year = request.end_year
# get the target filename
bounds_str = [f"{b:.4f}" for b in bounds]
# Download any missing files
filenames = []
for var in variables:
vfiles = []
for year in range(start_year, end_year + 1):
filename = self.names.file_name(var=var,
year=year,
north=bounds_str[3],
east=bounds_str[2],
west=bounds_str[0],
south=bounds_str[1])
vfiles.append(filename)
if not os.path.exists(filename):
# Download the file
self._download(var, year, bounds_str, filename)
filenames.append(vfiles)
# Open and return dataset (base class handles time clipping and CRS)
return self._openFiles(filenames, variables)