"""Functions used in interacting with data, either pandas DataFrames or xarray DataArrays."""
from typing import Union, List, Iterable, Tuple, Any, Optional, Literal, overload, Sequence
from xarray.core.types import InterpOptions
import numpy.typing as npt
import logging
import warnings
import cftime
import datetime
import pandas as pd
import geopandas as gpd
import numpy as np
import xarray as xr
import rasterio.transform
import rasterio.features
import shapely.geometry
import scipy.signal
import scipy.stats
import scipy.ndimage
import scipy.interpolate
import watershed_workflow.crs
from watershed_workflow.crs import CRS
ValidTime = Union[cftime._cftime.datetime, pd.Timestamp, datetime.datetime, np.datetime64]
#
# Helper functions
#
[docs]
def convertTimesToCFTime(time_values: Sequence[ValidTime]) -> npt.NDArray[cftime._cftime.datetime]:
"""Convert an iterable of datetime objects to cftime object.
This function accepts various datetime types and converts them to
cftime Gregorian calendar.
Parameters
----------
time_values
Iterable of datetime objects (numpy datetime64, pandas Timestamp,
Python datetime, or cftime objects). All elements must be the same type.
Returns
-------
List[cftime._cftime.datetime]
List of cftime objects, likely in the Gregorian calendar.
"""
# Handle empty input
if len(time_values) == 0:
return np.array([], dtype=cftime.DatetimeGregorian)
# Get a sample time and conditional on type
sample_time = time_values[0]
if isinstance(sample_time, cftime._cftime.datetime):
return np.array(time_values)
if isinstance(sample_time, (np.datetime64, datetime.datetime)):
time_values = pd.to_datetime(time_values).tolist()
if not isinstance(time_values[0], pd.Timestamp):
raise TypeError(f'Cannot convert items of type {type(time_values[0])} to cftime.')
# convert pd.Timestamp to cftime
res = [cftime.DatetimeGregorian(t.year, t.month, t.day, t.hour, t.minute, t.second, t.microsecond) for t in time_values]
return np.array(res)
[docs]
def convertTimesToCFTimeNoleap(time_values: Sequence[cftime._cftime.datetime]) -> npt.NDArray[cftime.DatetimeNoLeap]:
"""Convert an iterable of cftime objects on any calendar to cftime DatetimeNoLeap calendar.
This function accepts various datetime types and converts them to cftime NoLeap
calendar while preserving the input container type. Raises an error if any
input date represents the 366th day of a year (leap day in DayMet convention).
Parameters
----------
time_values
Sequence of datetime objects (numpy datetime64, pandas Timestamp,
Python datetime, or cftime objects). All elements must be the same type.
Returns
-------
Container of cftime.DatetimeNoLeap objects.
Raises
------
ValueError
If any date in the input represents the 366th day of a year (leap day).
"""
if len(time_values) == 0:
return np.array([], dtype=cftime.DatetimeNoLeap)
dayofyr = [t.dayofyr for t in time_values]
if max(dayofyr) == 366:
raise ValueError(f"Input contains leap day(s) (366th day of year)")
return np.array([
cftime.DatetimeNoLeap(t.year, 1, 1, t.hour, t.minute, t.second, t.microsecond)
+ datetime.timedelta(days=(t.dayofyr - 1)) for t in time_values
])
[docs]
def createNoleapMask(time_values : Sequence[ValidTime]
) -> Tuple[npt.NDArray[cftime.DatetimeNoLeap], npt.NDArray[bool]]:
"""Create a mask that is true for any non-leap-day (day 366).
Parameters
----------
time_values : Sequence[ValidTime]
Time values to filter for leap days.
Returns
-------
Sequence[cftime.DatetimeNoLeap]
Time values converted to cftime format with leap days filtered.
List[bool]
Boolean mask where True indicates non-leap days.
"""
# no times --> no leap days
if len(time_values) == 0:
return list(), list()
# cftime.DataNoleap --> no leap days
if isinstance(time_values[0], cftime.DatetimeNoLeap):
return time_values, [True, ] * len(time_values)
# Get the time column is in cftime format
time_in_cftime = convertTimesToCFTime(time_values)
mask = np.array([(t.dayofyr != 366) for t in time_in_cftime])
return time_in_cftime[mask], mask
def _convertDataFrameToDataset(df: pd.DataFrame, time_column: str) -> xr.Dataset:
"""Convert DataFrame to Dataset for shared processing.
Parameters
----------
df : pandas.DataFrame
Input DataFrame containing time series data.
time_column : str
Name of the time column.
Returns
-------
dataset : xarray.Dataset
Dataset containing the converted data.
Raises
------
ValueError
If time_column not found or no numeric columns available.
"""
if time_column not in df.columns:
raise ValueError(f"Column '{time_column}' not found in DataFrame")
# Sort by time
df_sorted = df.sort_values(time_column).copy()
# Get all numeric columns except time column
numeric_cols = df_sorted.select_dtypes(include=[np.number]).columns.tolist()
columns_out = [col for col in numeric_cols if col != time_column]
# Create Dataset from numeric columns
data_vars = {}
for col in columns_out:
data_vars[col] = xr.DataArray(df_sorted[col].values,
coords={ time_column: df_sorted[time_column].values },
dims=[time_column])
if len(data_vars) == 0:
raise ValueError(f"No numeric columns provided or found.")
return xr.Dataset(data_vars)
def _convertDatasetToDataFrame(ds: xr.Dataset, time_column: str, output_times: pd.Series) -> pd.DataFrame:
"""Convert Dataset back to DataFrame format.
Parameters
----------
ds : xarray.Dataset
Input Dataset to convert.
time_column : str
Name of the time column to use.
output_times : pandas.Series
Time values to use for the output DataFrame.
Returns
-------
pandas.DataFrame
Converted DataFrame with time column first, then other columns.
"""
result_df = ds.to_dataframe().reset_index()
result_df[time_column] = output_times
return result_df
[docs]
def computeMode(da: xr.DataArray, time_dim: str = 'time') -> xr.DataArray:
"""Compute the mode along the time dimension of a DataArray.
Parameters
----------
da : xr.DataArray
Input DataArray. Can contain any data type that scipy.stats.mode can handle.
time_dim : str, optional
Name of the time dimension along which to compute the mode. Default is 'time'.
Returns
-------
xr.DataArray
DataArray with the mode computed along the time dimension. The time dimension
is removed from the output. All other dimensions, coordinates, and attributes
are preserved. In case of multiple modes, returns the smallest value.
Raises
------
ValueError
If the specified time dimension is not found in the DataArray.
Notes
-----
For continuous data, the mode may not be meaningful. This function is most
useful for discrete or categorical data.
When multiple values have the same highest frequency (multiple modes),
scipy.stats.mode returns the smallest of these values.
NaN values are ignored in the mode calculation.
"""
if time_dim not in da.dims:
raise ValueError(f"Data must have a '{time_dim}' dimension")
# Get the axis number for time dimension
time_axis = da.dims.index(time_dim)
if np.issubdtype(da.dtype, np.integer) or np.issubdtype(da.dtype, np.floating):
# For numeric data, use scipy.stats.mode
mode_result = scipy.stats.mode(da.values, axis=time_axis, nan_policy='omit', keepdims=False)
mode_values = mode_result.mode
else:
raise ValueError('Non-numeric data modes are not implemented.')
# Create coordinates for the result (all except time dimension)
new_coords = { k: v for k, v in da.coords.items() if k != time_dim }
# Create new dimensions list (all except time dimension)
new_dims = [d for d in da.dims if d != time_dim]
# Create result DataArray
result = xr.DataArray(mode_values, dims=new_dims, coords=new_coords, attrs=da.attrs.copy())
# transfer the crs
try:
crs = da.rio.crs
except AttributeError:
pass
else:
if crs is not None:
result.rio.write_crs(crs, inplace=True)
# Preserve the name if it exists
if da.name is not None:
result.name = f"{da.name}_mode"
return result
#
# filter leap day
#
[docs]
def filterLeapDay_DataFrame(df: pd.DataFrame, time_column: str = 'time') -> pd.DataFrame:
"""Remove day 366 (Dec 31) from leap years and convert time column to CFTime noleap calendar.
Parameters
----------
df
Input DataFrame containing time series data.
time_column
Name of the column containing datetime data. Must be convertible to pandas datetime.
Returns
-------
pandas.DataFrame
DataFrame with day 366 of leap years removed and time column converted to
cftime noleap calendar format.
Raises
------
ValueError
If `time_column` is not found in the DataFrame.
If the time column cannot be converted to datetime format.
Notes
-----
Day 366 only occurs on December 31st in leap years. The function assumes that
the input time column is not already in cftime noleap format, as noleap
calendars by definition do not have day 366.
The DataFrame index is reset after filtering to ensure continuous indexing.
"""
# Validate inputs
try:
time_series = df[time_column]
except KeyError:
raise ValueError(f"Column '{time_column}' not found in DataFrame")
# no times --> no leap days
if len(time_series) == 0:
return df
# cftime.DataNoleap --> no leap days
if isinstance(time_series[0], cftime.DatetimeNoLeap):
return df
# Get the time column is in cftime format, and a mask that is True for any non-leap-days
try:
time_series_cftime, mask = createNoleapMask(time_series)
except Exception as e:
raise ValueError(f"Could not convert column '{time_column}' to cftime: {e}")
# Apply the filter
df_filtered = df[mask].reset_index(drop=True)
# Convert the time column to CFTime noleap calendar
df_filtered[time_column] = convertTimesToCFTimeNoleap(time_series_cftime)
return df_filtered
[docs]
def filterLeapDay_xarray(da: xr.DataArray | xr.Dataset,
time_dim: str = 'time') -> xr.DataArray | xr.Dataset:
"""Remove day 366 (Dec 31) from leap years and convert time dimension to CFTime noleap calendar.
Parameters
----------
da : xr.DataArray
Input DataArray with a time dimension. The time dimension must contain
datetime-like values that can be converted to pandas datetime.
Returns
-------
xr.DataArray
DataArray with day 366 of leap years removed and time dimension converted to
cftime noleap calendar format. All attributes, including rasterio-specific
attributes like 'nodata' and 'crs', are preserved.
Raises
------
ValueError
If the DataArray does not have a 'time' dimension.
If the time dimension cannot be converted to datetime format.
Notes
-----
Day 366 only occurs on December 31st in leap years. The function assumes that
the input time dimension is not already in cftime noleap format, as noleap
calendars by definition do not have day 366.
For rasterio-based DataArrays, this function preserves the coordinate reference
system (CRS) and nodata value attributes. All other attributes are also preserved.
The time dimension name is preserved, but its values are replaced with cftime
noleap calendar objects.
"""
# deal with missing time_dim
try:
time_series = da[time_dim]
except KeyError:
raise ValueError(f"Data must have a '{time_dim}' dimension")
# deal with empty time_dim
if len(time_series) == 0:
return da
# Create mask for values to keep (exclude day 366 in leap years)
time_array_cftime, mask = createNoleapMask(time_series.values)
# Apply the filter to the DataArray
da_filtered = da.isel({ time_dim: mask })
# Convert the filtered time values to cftime noleap
time_array_noleap = convertTimesToCFTimeNoleap(time_array_cftime)
# Replace the time coordinate with cftime noleap
da_filtered = da_filtered.assign_coords({ time_dim: time_array_noleap })
# Preserve all attributes from the original DataArray
try:
crs = da.rio.crs
except AttributeError:
pass
else:
if crs is not None:
da_filtered.rio.write_crs(crs, inplace=True)
da_filtered.attrs = da.attrs.copy()
# Preserve coordinate attributes (including CRS if present)
for coord in da_filtered.coords:
if coord in da.coords and hasattr(da.coords[coord], 'attrs'):
da_filtered.coords[coord].attrs = da.coords[coord].attrs.copy()
# Preserve attributes for all variables in a Dataset
if hasattr(da_filtered, 'data_vars'):
for var in da_filtered.data_vars:
if var in da.data_vars and hasattr(da[var], 'attrs'):
da_filtered[var].attrs = da[var].attrs.copy()
return da_filtered
@overload
def filterLeapDay(data: xr.Dataset, time_column: str = 'time') -> xr.Dataset:
...
@overload
def filterLeapDay(data: xr.DataArray, time_column: str = 'time') -> xr.DataArray:
...
@overload
def filterLeapDay(data: pd.DataFrame, time_column: str = 'time') -> pd.DataFrame:
...
[docs]
def filterLeapDay(data: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
time_column: str = 'time') -> Union[pd.DataFrame, xr.DataArray, xr.Dataset]:
"""Remove day 366 (Dec 31) from leap years and convert time to CFTime noleap calendar.
This function automatically selects the appropriate implementation based on the
input data type: DataFrame, DataArray, or Dataset.
Parameters
----------
data : pandas.DataFrame, xr.DataArray, or xr.Dataset
Input data containing time series information to filter.
time_column : str, optional
For DataFrame: Name of the column containing datetime data (required).
For Dataset: Name of the time dimension (defaults to 'time').
For DataArray: Ignored (always uses 'time' dimension).
Returns
-------
pandas.DataFrame, xr.DataArray, or xr.Dataset
Same type as input with day 366 of leap years removed and time
converted to cftime noleap calendar format.
Raises
------
TypeError
If data is not a DataFrame, DataArray, or Dataset.
If DataFrame is provided without time_column.
ValueError
If time column/dimension is not found or cannot be converted to datetime.
Notes
-----
Day 366 only occurs on December 31st in leap years. The function assumes that
the input time data is not already in cftime noleap format, as noleap
calendars by definition do not have day 366.
For DataFrames:
- time_column parameter is required
- DataFrame index is reset after filtering
For DataArrays & Datasets:
- time_column parameter specifies the time dimension (default: 'time')
- All attributes including rasterio-specific ones are preserved
See Also
--------
filterLeapDay_DataFrame : DataFrame-specific implementation
filterLeapDay_DataArray : DataArray-specific implementation
filterLeapDay_Dataset : Dataset-specific implementation
"""
if isinstance(data, (xr.Dataset, xr.DataArray)):
# Dataset can use custom time dimension name
return filterLeapDay_xarray(data, time_dim=time_column)
elif isinstance(data, pd.DataFrame):
if time_column is None:
raise TypeError("time_column parameter is required for DataFrame input")
return filterLeapDay_DataFrame(data, time_column)
else:
raise TypeError(f"Input data must be a pandas DataFrame, xr DataArray, or xr Dataset. "
f"Got {type(data).__name__}")
[docs]
def interpolate_Dataset(ds: xr.Dataset | xr.DataArray,
time_values: Sequence[ValidTime],
time_dim: str = 'time',
method: InterpOptions = 'linear') -> xr.Dataset:
"""Interpolate Dataset to arbitrary times.
Parameters
----------
ds : xr.Dataset
Input Dataset with a time dimension containing cftime objects.
time_values : Sequence[ValidTime]
Time values to interpolate to.
time_dim : str
Name of the time dimension. Default is 'time'.
method : str, optional
Interpolation method. Default is 'linear'.
Returns
-------
xr.Dataset
Dataset with regular time intervals and interpolated values.
Raises
------
ValueError
If time dimension is not found or interval is not 1 or 5.
"""
if time_dim not in ds.dims:
raise ValueError(f"Data must have a '{time_dim}' dimension")
result = ds.interp({ time_dim: time_values },
method=method,
kwargs={ 'fill_value': 'extrapolate'} if method in ['linear', 'nearest'] else {})
# Preserve Dataset attributes
result.attrs = ds.attrs.copy()
return result
[docs]
def interpolate_DataFrame(df: pd.DataFrame,
time_values: Sequence[ValidTime],
time_column: str = 'time',
method: InterpOptions = 'linear') -> pd.DataFrame:
"""Interpolate DataFrame to arbitrary times.
NOTE: this is not the same as pandas.interpolate(), but more like
pandas.reindex(time_values).interpolate() with scipy-based
interpolation options.
Parameters
----------
df : pandas.DataFrame
Input DataFrame with a time column containing cftime objects.
time_values : Sequence[ValidTime]
Time values to interpolate to.
time_column : str
Name of the column containing cftime datetime objects.
method : str, optional
Interpolation method. Default is 'linear'.
Returns
-------
pandas.DataFrame
DataFrame with regular time intervals and interpolated values.
Raises
------
ValueError
If time_column is not found or interval is not 1 or 5.
"""
# get a dataset
ds = _convertDataFrameToDataset(df, time_column)
# interpolate using dataset function
interp_ds = interpolate_Dataset(ds, time_values, time_column, method)
# Convert back to DataFrame
return _convertDatasetToDataFrame(interp_ds, time_column, interp_ds[time_column])
@overload
def interpolate(data: xr.Dataset,
time_values: Sequence[ValidTime] = ...,
time_dim: str = ...,
method: InterpOptions = ...,
) -> xr.Dataset:
...
@overload
def interpolate(data: xr.DataArray,
time_values: Sequence[ValidTime] = ...,
time_dim: str = ...,
method: InterpOptions = ...,
) -> xr.DataArray:
...
@overload
def interpolate(data: pd.DataFrame,
time_values: Sequence[ValidTime] = ...,
time_dim: str = ...,
method: InterpOptions = ...,
) -> pd.DataFrame:
...
[docs]
def interpolate(data: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
time_values: Sequence[ValidTime] = ...,
time_dim: str = 'time',
method: InterpOptions = 'linear'
) -> Union[pd.DataFrame, xr.DataArray, xr.Dataset]:
"""
Interpolate data to new times.
This function automatically selects the appropriate implementation based on the
input data type: DataFrame, DataArray, or Dataset.
Parameters
----------
data : pandas.DataFrame, xr.DataArray, or xr.Dataset
Input data containing time series with cftime calendar.
time_values : Sequence[ValidTime]
Time values to interpolate to.
time_dim : str, optional
For DataFrame: Name of the time column (required).
For Dataset/DataArray: Name of the time dimension (default: 'time').
method : str, optional
Interpolation method. Default is 'linear'.
Returns
-------
pandas.DataFrame, xr.DataArray, or xr.Dataset
Same type as input with regular time intervals and interpolated values.
Raises
------
TypeError
If data is not a DataFrame, DataArray, or Dataset.
ValueError
If required parameters are missing or invalid.
See Also
--------
interpolate_DataFrame : DataFrame-specific implementation
interpolate_Dataset : Dataset-specific implementation
"""
if isinstance(data, (xr.Dataset, xr.DataArray)):
return interpolate_Dataset(data, time_values, time_dim, method)
elif isinstance(data, pd.DataFrame):
return interpolate_DataFrame(data, time_values, time_dim, method)
else:
raise TypeError(f"Input data must be a pandas DataFrame, xr DataArray, or xr Dataset. "
f"Got {type(data).__name__}")
[docs]
def interpolateToRegular(data: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
interval : int = 1,
time_dim: str = 'time',
method: InterpOptions = 'linear'
) -> Union[pd.DataFrame, xr.DataArray, xr.Dataset]:
"""
Interpolate data to new times.
This function automatically selects the appropriate implementation based on the
input data type: DataFrame, DataArray, or Dataset.
Parameters
----------
data : pandas.DataFrame, xr.DataArray, or xr.Dataset
Input data containing time series with cftime calendar.
time_values : Sequence[ValidTime]
Time values to interpolate to.
time_dim : str, optional
For DataFrame: Name of the time column (required).
For Dataset/DataArray: Name of the time dimension (default: 'time').
method : str, optional
Interpolation method. Default is 'linear'.
Returns
-------
pandas.DataFrame, xr.DataArray, or xr.Dataset
Same type as input with regular time intervals and interpolated values.
Raises
------
TypeError
If data is not a DataFrame, DataArray, or Dataset.
ValueError
If required parameters are missing or invalid.
See Also
--------
interpolate_DataFrame : DataFrame-specific implementation
interpolate_Dataset : Dataset-specific implementation
"""
start_year = data[time_dim].values[0].year
end_year = data[time_dim].values[-1].year
calendar = None
if isinstance(data[time_dim].values[0], cftime.DatetimeNoLeap):
calendar = 'noleap'
new_times = xr.date_range(data[time_dim].values[0], data[time_dim].values[-1], freq=f'{interval}D', calendar=calendar)
return interpolate(data, new_times, time_dim, method)
#
# Compute an annual average
#
def _computeAverageYear(ds: xr.Dataset, time_dim: str, start_date: cftime.datetime,
output_nyears: int) -> Tuple[xr.Dataset, List[cftime.datetime]]:
"""
Compute annual average for a Dataset and generate output times.
Parameters
----------
ds : xr.Dataset
Input Dataset with cftime noleap calendar dates.
time_dim : str
Name of the time dimension.
start_date : cftime.datetime
Start date for the output time series.
output_nyears : int
Number of years to repeat the averaged pattern.
Returns
-------
averaged_ds : xr.Dataset
Dataset with averaged values indexed by day of year.
output_times : list of cftime.datetime
List of output times for the repeated pattern.
"""
# Calculate day of year for each time point
time_values = ds[time_dim].values
doys = np.array([(date - cftime.DatetimeNoLeap(date.year, 1, 1)).days + 1
for date in time_values])
# Get unique days of year
unique_doys = np.unique(doys)
# Add day of year coordinate temporarily
ds_with_doy = ds.assign_coords(day_of_year=(time_dim, doys))
# Group by day of year and compute mean
averaged_ds = ds_with_doy.groupby('day_of_year').mean(dim=time_dim)
# Create output times
output_times = []
output_doys = []
for year_offset in range(output_nyears):
current_year = start_date.year + year_offset
for doy in unique_doys:
date = cftime.DatetimeNoLeap(current_year, 1, 1) + datetime.timedelta(days=int(doy - 1))
output_times.append(date)
output_doys.append(doy)
# Create mapping from output indices to day of year
output_doys_array = xr.DataArray(output_doys, dims=['new_time'])
# Create result by selecting appropriate days
result_ds = averaged_ds.sel(day_of_year=output_doys_array)
result_ds = result_ds.rename({ 'new_time': time_dim })
# Drop day_of_year coordinate
if 'day_of_year' in result_ds.coords:
result_ds = result_ds.drop_vars('day_of_year')
return result_ds, output_times
def _parseStartDate(
start_date: Union[str, int, datetime.datetime, cftime.datetime]) -> cftime.datetime:
"""
Parse start_date into cftime.DatetimeNoLeap.
Parameters
----------
start_date : str, datetime, or cftime.datetime
Start date in various formats.
Returns
-------
cftime.DatetimeNoLeap
Parsed start date.
"""
if isinstance(start_date, int):
start_date = str(start_date)
if isinstance(start_date, str):
parts = start_date.split('-')
year = int(parts[0])
month = int(parts[1]) if len(parts) > 1 else 1
day = int(parts[2]) if len(parts) > 2 else 1
return cftime.DatetimeNoLeap(year, month, day)
elif isinstance(start_date, datetime.datetime):
return cftime.DatetimeNoLeap(start_date.year, start_date.month, start_date.day)
else:
return start_date
[docs]
def computeAverageYear_DataFrame(df: pd.DataFrame,
time_column: str = 'time',
start_date: Union[str, datetime.datetime,
cftime.datetime] = '2020-1-1',
output_nyears: int = 2) -> pd.DataFrame:
"""
Average DataFrame values across years and repeat for specified number of years.
Parameters
----------
df : pandas.DataFrame
Input DataFrame with cftime noleap calendar dates at 1- or 5-day intervals.
time_column : str
Name of the column containing cftime datetime objects.
start_date : str, datetime, or cftime.datetime
Start date for the output time series. If string, should be 'YYYY-MM-DD' format.
output_nyears : int
Number of years to repeat the averaged pattern.
Returns
-------
pandas.DataFrame
DataFrame with averaged values repeated for the specified number of years,
starting from start_date. Only includes the time column and averaged numeric columns.
Raises
------
ValueError
If time_column is not found or contains invalid data.
Notes
-----
The function computes the average value for each day of year (1-365) across all
years in the input data. For 5-day intervals, it averages values at days 1, 6,
11, etc. The resulting pattern is then repeated for output_nyears starting from
start_date.
Missing values (NaN) are ignored in the averaging process.
Non-numeric columns are excluded from the output.
"""
# Parse start date
start_cftime = _parseStartDate(start_date)
# get a dataset
ds = _convertDataFrameToDataset(df, time_column)
# Compute averages using shared function
averaged_ds, output_times = _computeAverageYear(ds, time_column, start_cftime, output_nyears)
# Convert back to DataFrame
return _convertDatasetToDataFrame(averaged_ds, time_column, output_times)
[docs]
def computeAverageYear_DataArray(da: xr.DataArray,
time_dim: str = 'time',
start_date: Union[str, datetime.datetime,
cftime.datetime] = '2020-1-1',
output_nyears: int = 2,
) -> xr.DataArray:
"""
Average DataArray values across years and repeat for specified number of years.
Parameters
----------
da : xr.DataArray
Input DataArray with cftime noleap calendar dates at 1- or 5-day intervals.
start_date : str, datetime, or cftime.datetime
Start date for the output time series. If string, should be 'YYYY-MM-DD' format.
output_nyears : int
Number of years to repeat the averaged pattern.
time_dim : str, optional
Name of the time dimension. Default is 'time'.
Returns
-------
xr.DataArray
DataArray with averaged values repeated for the specified number of years,
starting from start_date. All attributes are preserved.
Raises
------
ValueError
If time dimension is not found.
Notes
-----
The function computes the average value for each day of year (1-365) across all
years in the input data. The resulting 365-day pattern is then repeated for
output_nyears starting from start_date.
This is particularly useful for creating climatological datasets or for
generating synthetic time series based on historical patterns.
"""
if time_dim not in da.dims:
raise ValueError(f"Data must have a '{time_dim}' dimension")
# Parse start date
start_cftime = _parseStartDate(start_date)
# Convert to Dataset for processing
temp_ds = xr.Dataset({da.name or 'data': da})
# Compute averages using shared function
averaged_ds, output_times = _computeAverageYear(temp_ds, time_dim, start_cftime, output_nyears)
# Extract the DataArray
result = averaged_ds[da.name or 'data']
# Assign the output times
result = result.assign_coords({ time_dim: output_times })
# Preserve attributes
result.attrs = da.attrs.copy()
result.name = da.name
return result
[docs]
def computeAverageYear_Dataset(ds: xr.Dataset,
time_dim: str = 'time',
start_date: Union[str, datetime.datetime,
cftime.datetime] = '2020-1-1',
output_nyears: int = 2,
variables: Optional[List[str]] = None) -> xr.Dataset:
"""
Average Dataset values across years and repeat for specified number of years.
Parameters
----------
ds : xr.Dataset
Input Dataset with cftime noleap calendar dates at 1- or 5-day intervals.
start_date : str, datetime, or cftime.datetime
Start date for the output time series. If string, should be 'YYYY-MM-DD' format.
output_nyears : int
Number of years to repeat the averaged pattern.
time_dim : str, optional
Name of the time dimension. Default is 'time'.
variables : list of str, optional
List of variables to average. If None, averages all variables with the
time dimension.
Returns
-------
xr.Dataset
Dataset with averaged values repeated for the specified number of years,
starting from start_date. All attributes are preserved.
Raises
------
ValueError
If time dimension is not found or if specified variables don't exist.
Notes
-----
Variables without the time dimension are preserved unchanged in the output.
For each variable with the time dimension, the function computes the average
value for each day of year across all years in the input data.
"""
if time_dim not in ds.dims:
raise ValueError(f"Data must have a '{time_dim}' dimension")
# Parse start date
start_cftime = _parseStartDate(start_date)
# Select variables to process
if variables is None:
vars_with_time = [var for var in ds.data_vars if time_dim in ds[var].dims]
vars_without_time = [var for var in ds.data_vars if time_dim not in ds[var].dims]
else:
# Validate variables
missing = [v for v in variables if v not in ds.data_vars]
if missing:
raise ValueError(f"Variables not found in Dataset: {missing}")
vars_with_time = [v for v in variables if time_dim in ds[v].dims]
vars_without_time = [var for var in ds.data_vars if var not in variables]
# Create Dataset with only time-dependent variables
ds_to_avg = ds[vars_with_time] if vars_with_time else xr.Dataset()
if vars_with_time:
# Compute averages using shared function
averaged_ds, output_times = _computeAverageYear(ds_to_avg, time_dim, start_cftime,
output_nyears)
# Assign the output times
result = averaged_ds.assign_coords({ time_dim: output_times })
else:
# No time-dependent variables to average
result = xr.Dataset()
# Add variables without time dimension
for var in vars_without_time:
result[var] = ds[var]
# Copy over other coordinates
for coord in ds.coords:
if coord != time_dim and coord not in result.coords:
result.coords[coord] = ds.coords[coord]
# Preserve attributes
result.attrs = ds.attrs.copy()
for var in result.data_vars:
if var in ds.data_vars:
result[var].attrs = ds[var].attrs.copy()
return result
@overload
def computeAverageYear(data: xr.Dataset,
time_column: str,
start_year: int,
output_nyears: int) -> xr.Dataset:
...
@overload
def computeAverageYear(data: xr.DataArray,
time_column: str,
start_year: int,
output_nyears: int) -> xr.DataArray:
...
@overload
def computeAverageYear(data: pd.DataFrame,
time_column: str,
start_year: int,
output_nyears: int) -> pd.DataFrame:
...
[docs]
def computeAverageYear(
data: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
time_column: str,
start_year: int,
output_nyears: int) -> Union[pd.DataFrame, xr.DataArray, xr.Dataset]:
"""
Average data values across years and repeat for specified number of years.
This function automatically selects the appropriate implementation based on the
input data type: DataFrame, DataArray, or Dataset.
Parameters
----------
data : pandas.DataFrame, xr.DataArray, or xr.Dataset
Input data with cftime noleap calendar dates at 1- or 5-day intervals.
time_column : str, optional
For DataFrame: Name of the time column (required).
For Dataset: Name of the time dimension (default: 'time').
For DataArray: Ignored (always uses 'time' dimension).
start_year : int
Start year for the output time series.
output_nyears : int, optional
Number of years to repeat the averaged pattern. Default is 1.
Returns
-------
pandas.DataFrame, xr.DataArray, or xr.Dataset
Same type as input with averaged values repeated for the specified number
of years, starting from start_date.
Raises
------
TypeError
If data is not a DataFrame, DataArray, or Dataset.
If DataFrame is provided without time_column.
ValueError
If time column/dimension is not found or contains invalid data.
Notes
-----
The function computes the average value for each day of year (1-365) across all
years in the input data. For 5-day intervals, it averages values at days 1, 6,
11, etc. The resulting pattern is then repeated for output_nyears starting from
start_date.
Missing values (NaN) are ignored in the averaging process.
For DataFrames, only numeric columns are included in the output
For DataArrays all attributes are preserved
For Datasets
- Variables without the time dimension are preserved unchanged
- All attributes and encodings are preserved
"""
start_date = cftime.DatetimeNoLeap(start_year, 1, 1)
if isinstance(data, pd.DataFrame):
if time_column is None:
raise TypeError("time_column parameter is required for DataFrame input")
return computeAverageYear_DataFrame(data, time_column, start_date, output_nyears)
elif isinstance(data, xr.DataArray):
return computeAverageYear_DataArray(data, time_column, start_date, output_nyears)
elif isinstance(data, xr.Dataset):
# Dataset can use custom time dimension name
time_dim = time_column or time_column
return computeAverageYear_Dataset(data, time_dim, start_date, output_nyears)
else:
raise TypeError(f"Input data must be a pandas DataFrame, xr DataArray, or xr Dataset. "
f"Got {type(data).__name__}")
#
# Smooth data temporally
#
[docs]
def smoothTimeSeries_Array(data: np.ndarray,
method: Literal['savgol', 'rolling_mean'] = 'savgol',
axis: int = -1,
**kwargs) -> np.ndarray:
"""
Smooth time series data using specified method.
Parameters
----------
data : numpy.ndarray
Array of data to smooth. Must not contain NaN values.
method : {'savgol', 'rolling_mean'}
Smoothing method to use.
axis : int, optional
Axis along which to smooth. Default is -1 (last axis).
**kwargs : dict
Method-specific parameters.
For 'savgol':
- window_length : int, odd number (default: 7)
- polyorder : int (default: 3)
- mode : str (default: 'interp')
For 'rolling_mean':
- window : int (default: 5)
- center : bool (default: True)
Returns
-------
numpy.ndarray
Smoothed data array.
Raises
------
ValueError
If method is not recognized, parameters are invalid, or data contains NaN.
"""
# Check for NaN values
# if np.any(np.isnan(data)):
# raise ValueError("Data contains NaN values")
if method == 'savgol':
# Extract savgol parameters
window_length = kwargs.get('window_length', 7)
polyorder = kwargs.get('polyorder', 3)
mode = kwargs.get('mode', 'interp')
# Validate parameters
if window_length % 2 == 0:
raise ValueError("window_length must be odd for Savitzky-Golay filter")
if polyorder >= window_length:
raise ValueError("polyorder must be less than window_length")
if data.shape[axis] < window_length:
raise ValueError(
f"Data length along axis {axis} ({data.shape[axis]}) must be >= window_length ({window_length})"
)
# Apply Savitzky-Golay filter along specified axis
return scipy.signal.savgol_filter(data, window_length, polyorder, axis=axis, mode=mode)
elif method == 'rolling_mean':
# Extract rolling mean parameters
window = kwargs.get('window', 5)
center = kwargs.get('center', True)
# Calculate origin for centering
if center:
origin = 0
else:
origin = -(window // 2)
# Apply uniform filter (equivalent to rolling mean)
return scipy.ndimage.uniform_filter1d(data,
size=window,
axis=axis,
mode='reflect',
origin=origin)
else:
raise ValueError(f"Unknown smoothing method: {method}")
[docs]
def smoothTimeSeries_DataFrame(df: pd.DataFrame,
time_column: str = 'time',
method: Literal['savgol', 'rolling_mean'] = 'savgol',
**kwargs) -> pd.DataFrame:
"""
Smooth time series data in a DataFrame along the time dimension.
Parameters
----------
df : pandas.DataFrame
Input DataFrame with time series data. Must not contain NaN values
in columns to be smoothed.
time_column : str
Name of the time column.
method : {'savgol', 'rolling_mean'}, optional
Smoothing method. Default is 'savgol'.
**kwargs : dict
Method-specific parameters passed to smoothing function.
For 'savgol':
- window_length : int, odd number (default: 7)
- polyorder : int (default: 3)
- mode : {'mirror', 'constant', 'nearest', 'wrap', 'interp'} (default: 'interp')
For 'rolling_mean':
- window : int (default: 5)
- center : bool (default: True)
Returns
-------
pandas.DataFrame
DataFrame with smoothed data.
Raises
------
ValueError
If any column to be smoothed contains NaN values.
Notes
-----
The Savitzky-Golay filter is useful for smoothing noisy data while preserving
important features like peaks. The rolling mean provides simple moving average
smoothing.
Data is sorted by time before smoothing to ensure correct temporal ordering.
"""
if time_column not in df.columns:
raise ValueError(f"Column '{time_column}' not found in DataFrame")
# Make a copy and sort by time
df_sorted = df.sort_values(time_column).copy()
# Identify columns to smooth
numeric_cols = df_sorted.select_dtypes(include=[np.number]).columns.tolist()
columns_to_smooth = [col for col in numeric_cols if col != time_column]
# Apply smoothing to each column
result = df_sorted.copy()
for col in columns_to_smooth:
data = df_sorted[col].values
result[col] = smoothTimeSeries_Array(data, method, axis=0, **kwargs)
return result
[docs]
def smoothTimeSeries_DataArray(da: xr.DataArray,
time_dim: str = 'time',
method: Literal['savgol', 'rolling_mean'] = 'savgol',
**kwargs) -> xr.DataArray:
"""
Smooth time series data in a DataArray along the time dimension.
Parameters
----------
da : xr.DataArray
Input DataArray with time series data. Must not contain NaN values.
time_dim : str, optional
Name of the time dimension. Default is 'time'.
method : {'savgol', 'rolling_mean'}, optional
Smoothing method. Default is 'savgol'.
**kwargs : dict
Method-specific parameters passed to smoothing function.
Returns
-------
xr.DataArray
DataArray with smoothed data. All attributes and coordinates are preserved.
Raises
------
ValueError
If time dimension is not found or data contains NaN values.
Notes
-----
For multidimensional arrays, smoothing is applied along the time dimension
for each combination of other dimensions (e.g., each spatial point).
"""
if time_dim not in da.dims:
raise ValueError(f"Data must have a '{time_dim}' dimension")
# Check for NaN values
# if np.any(np.isnan(da.values)):
# raise ValueError("DataArray contains NaN values")
# Get the axis number for time dimension
time_axis = da.dims.index(time_dim)
# Apply smoothing along time dimension in one pass
smoothed_data = smoothTimeSeries_Array(da.values, method, axis=time_axis, **kwargs)
# Create result DataArray
result = da.copy()
result.values = smoothed_data
return result
[docs]
def smoothTimeSeries_Dataset(ds: xr.Dataset,
time_dim: str = 'time',
method: Literal['savgol', 'rolling_mean'] = 'savgol',
variables: Optional[List[str]] = None,
**kwargs) -> xr.Dataset:
"""
Smooth time series data in a Dataset along the time dimension.
Parameters
----------
ds : xr.Dataset
Input Dataset with time series data. Variables to be smoothed must
not contain NaN values.
time_dim : str, optional
Name of the time dimension. Default is 'time'.
method : {'savgol', 'rolling_mean'}, optional
Smoothing method. Default is 'savgol'.
variables : list of str, optional
Variables to smooth. If None, smooths all variables with the time dimension.
**kwargs : dict
Method-specific parameters passed to smoothing function.
Returns
-------
xr.Dataset
Dataset with smoothed data. Variables without the time dimension are
preserved unchanged. All attributes are preserved.
Raises
------
ValueError
If time dimension is not found, specified variables don't exist,
or any variable to be smoothed contains NaN values.
Notes
-----
Variables without the time dimension are copied unchanged to the output.
"""
if time_dim not in ds.dims:
raise ValueError(f"Data must have a '{time_dim}' dimension")
# Select variables to smooth
if variables is None:
vars_to_smooth = [var for var in ds.data_vars if time_dim in ds[var].dims]
else:
# Validate variables
missing = [v for v in variables if v not in ds.data_vars]
if missing:
raise ValueError(f"Variables not found in Dataset: {missing}")
# Only smooth variables that have time dimension
vars_to_smooth = [v for v in variables if time_dim in ds[v].dims]
# Warn about variables without time dimension
no_time = [v for v in variables if time_dim not in ds[v].dims]
if no_time:
warnings.warn(
f"Variables without '{time_dim}' dimension will not be smoothed: {no_time}")
# Create result dataset
result = ds.copy()
# Smooth each variable
for var in vars_to_smooth:
result[var] = smoothTimeSeries_DataArray(ds[var], time_dim, method, **kwargs)
return result
@overload
def smoothTimeSeries(data: xr.DataArray,
time_dim: str = ...,
method: Literal['savgol', 'rolling_mean'] = ...,
**kwargs) -> xr.DataArray:
...
@overload
def smoothTimeSeries(data: xr.Dataset,
time_dim: str = ...,
method: Literal['savgol', 'rolling_mean'] = ...,
**kwargs) -> xr.Dataset:
...
@overload
def smoothTimeSeries(data: pd.DataFrame,
time_dim: str = ...,
method: Literal['savgol', 'rolling_mean'] = ...,
**kwargs) -> pd.DataFrame:
...
[docs]
def smoothTimeSeries(data: Union[pd.DataFrame, xr.DataArray, xr.Dataset],
time_dim: str = 'time',
method: Literal['savgol', 'rolling_mean'] = 'savgol',
**kwargs) -> Union[pd.DataFrame, xr.DataArray, xr.Dataset]:
"""
Smooth time series data using specified method.
This function automatically selects the appropriate implementation based on the
input data type: DataFrame, DataArray, or Dataset.
Parameters
----------
data : pandas.DataFrame, xr.DataArray, or xr.Dataset
Input data with time series. Must not contain NaN values in data to be smoothed.
time_dim : str, optional
For DataFrame: Name of the time column (required).
For DataArray: Ignored.
For Dataset: Ignored (use time_dim instead).
method : {'savgol', 'rolling_mean'}, optional
Smoothing method. Default is 'savgol'.
time_dim : str, optional
For DataArray/Dataset: Name of time dimension (default: 'time').
For DataFrame: Ignored (use time_dim instead).
**kwargs : dict
Method-specific parameters:
For 'savgol':
- window_length : int, odd number (default: 7)
- polyorder : int (default: 3)
- mode : {'mirror', 'constant', 'nearest', 'wrap', 'interp'} (default: 'interp')
For 'rolling_mean':
- window : int (default: 5)
- center : bool (default: True)
Returns
-------
pandas.DataFrame, xr.DataArray, or xr.Dataset
Same type as input with smoothed data.
Raises
------
TypeError
If data is not a DataFrame, DataArray, or Dataset.
If DataFrame is provided without time_dim.
ValueError
If time column/dimension is not found.
If data contains NaN values.
If smoothing parameters are invalid.
"""
if isinstance(data, pd.DataFrame):
return smoothTimeSeries_DataFrame(data, time_dim, method, **kwargs)
elif isinstance(data, xr.DataArray):
# DataArray uses time_dim parameter
return smoothTimeSeries_DataArray(data, time_dim, method, **kwargs)
elif isinstance(data, xr.Dataset):
return smoothTimeSeries_Dataset(data, time_dim, method, **kwargs)
else:
raise TypeError(f"Input data must be a pandas DataFrame, xr DataArray, or xr Dataset. "
f"Got {type(data).__name__}")
#
# 2D smoothing of dataset
#
def _smooth2D_Array(data: np.ndarray,
method: Literal['uniform', 'gaussian', 'box'] = 'gaussian',
**kwargs) -> np.ndarray:
"""
Apply 2D spatial smoothing to data.
Parameters
----------
data : numpy.ndarray
3D or higher array where the last two dimensions are spatial.
Must not contain NaN values.
method : {'uniform', 'gaussian', 'box'}
Smoothing method to use.
**kwargs : dict
Method-specific parameters:
For 'uniform':
- size : int or tuple of int (default: 3)
Filter size in pixels. If int, same size for both dimensions.
For 'gaussian':
- sigma : float or tuple of float (default: 1.0)
Standard deviation of Gaussian kernel. If float, same for both dimensions.
- truncate : float (default: 4.0)
Truncate filter at this many standard deviations.
For 'box':
- kernel_size : int or tuple of int (default: 3)
Size of box filter. If int, same size for both dimensions.
Returns
-------
numpy.ndarray
Smoothed data array with same shape as input.
Raises
------
ValueError
If method is not recognized or data contains NaN.
"""
# Check for NaN values
# if np.any(np.isnan(data)):
# raise ValueError("Data contains NaN values")
# Ensure we have at least 2D data
if data.ndim < 2:
raise ValueError("Data must be at least 2D for spatial smoothing")
if method == 'uniform':
# Uniform filter (moving average)
size = kwargs.get('size', 3)
if isinstance(size, int):
size = (size, size)
# Apply uniform filter to last two dimensions
axes = (-2, -1)
return scipy.ndimage.uniform_filter(data, size=size, axes=axes, mode='reflect')
elif method == 'gaussian':
# Gaussian filter
sigma = kwargs.get('sigma', 1.0)
truncate = kwargs.get('truncate', 4.0)
if isinstance(sigma, (int, float)):
sigma = (sigma, sigma)
# Apply Gaussian filter to last two dimensions
axes = (-2, -1)
return scipy.ndimage.gaussian_filter(data,
sigma=sigma,
axes=axes,
mode='reflect',
truncate=truncate)
elif method == 'box':
# Box filter (simple averaging)
kernel_size = kwargs.get('kernel_size', 3)
if isinstance(kernel_size, int):
kernel_size = (kernel_size, kernel_size)
# Create box kernel
kernel = np.ones(kernel_size) / np.prod(kernel_size)
# For higher dimensional data, we need to apply 2D convolution along last two axes
if data.ndim == 2:
return scipy.signal.convolve2d(data, kernel, mode='same', boundary='symm')
else:
# Process along last two dimensions for each slice
result = np.empty_like(data)
# Iterate over all but the last two dimensions
for idx in np.ndindex(data.shape[:-2]):
result[idx] = scipy.signal.convolve2d(data[idx],
kernel,
mode='same',
boundary='symm')
return result
else:
raise ValueError(f"Unknown smoothing method: {method}")
def _findSpatialDims(dims: List[str]) -> Tuple[str, str]:
"""
Find spatial dimensions in a list of dimension names.
Parameters
----------
dims : tuple of str
Dimension names to search.
Returns
-------
dim1, dim2 : str
Names of the two spatial dimensions.
Raises
------
ValueError
If suitable spatial dimensions cannot be found.
"""
# First try x and y
if 'x' in dims and 'y' in dims:
return 'x', 'y'
# Then try lon and lat
if 'lon' in dims and 'lat' in dims:
return 'lon', 'lat'
# Also try longitude and latitude
if 'longitude' in dims and 'latitude' in dims:
return 'longitude', 'latitude'
# No standard spatial dimensions found
raise ValueError("Could not find spatial dimensions. Expected 'x' and 'y' or 'lon' and 'lat'. "
f"Available dimensions: {dims}")
[docs]
def smooth2D_DataArray(da: xr.DataArray,
dim1: Optional[str] = None,
dim2: Optional[str] = None,
method: Literal['uniform', 'gaussian', 'box'] = 'gaussian',
**kwargs) -> xr.DataArray:
"""
Apply 2D spatial smoothing to a DataArray.
Parameters
----------
da : xr.DataArray
Input DataArray with at least 2 spatial dimensions. Must not contain NaN values.
dim1 : str, optional
First spatial dimension. If None, will try to find 'x' or 'lon'.
dim2 : str, optional
Second spatial dimension. If None, will try to find 'y' or 'lat'.
method : {'uniform', 'gaussian', 'box'}, optional
Smoothing method. Default is 'gaussian'.
**kwargs : dict
Method-specific parameters passed to smoothing function.
For 'uniform':
- size : int or tuple of int (default: 3)
For 'gaussian':
- sigma : float or tuple of float (default: 1.0)
- truncate : float (default: 4.0)
For 'box':
- kernel_size : int or tuple of int (default: 3)
Returns
-------
xr.DataArray
DataArray with smoothed spatial data. All attributes and coordinates
are preserved.
Raises
------
ValueError
If spatial dimensions are not found or data contains NaN values.
Notes
-----
The smoothing is applied in 2D to each slice along non-spatial dimensions.
For example, if the data has dimensions (time, lat, lon), smoothing is
applied to each time slice independently.
"""
# Find spatial dimensions if not provided
if dim1 is None or dim2 is None:
found_dim1, found_dim2 = _findSpatialDims(
[da.dims, ] if isinstance(da.dims, str) else [str(d) for d in da.dims])
dim1 = dim1 or found_dim1
dim2 = dim2 or found_dim2
# Validate dimensions exist
if dim1 not in da.dims:
raise ValueError(f"Dimension '{dim1}' not found in DataArray")
if dim2 not in da.dims:
raise ValueError(f"Dimension '{dim2}' not found in DataArray")
# Check for NaN values
# if np.any(np.isnan(da.values)):
# raise ValueError("DataArray contains NaN values")
# Get indices of spatial dimensions
dim1_idx = da.dims.index(dim1)
dim2_idx = da.dims.index(dim2)
# Transpose data so spatial dimensions are last
dims_order = [d for d in da.dims if d not in [dim1, dim2]] + [dim1, dim2]
da_transposed = da.transpose(*dims_order)
# Apply smoothing
smoothed_data = _smooth2D_Array(da_transposed.values, method, **kwargs)
# Create result with same dimension order as input
result = da.copy()
# Transpose back to original order
result_transposed = da_transposed.copy()
result_transposed.values = smoothed_data
result = result_transposed.transpose(*da.dims)
return result
[docs]
def smooth2D_Dataset(ds: xr.Dataset,
dim1: Optional[str] = None,
dim2: Optional[str] = None,
method: Literal['uniform', 'gaussian', 'box'] = 'gaussian',
variables: Optional[list[str]] = None,
**kwargs) -> xr.Dataset:
"""
Apply 2D spatial smoothing to variables in a Dataset.
Parameters
----------
ds : xr.Dataset
Input Dataset with at least 2 spatial dimensions. Variables to be smoothed
must not contain NaN values.
dim1 : str, optional
First spatial dimension. If None, will try to find 'x' or 'lon'.
dim2 : str, optional
Second spatial dimension. If None, will try to find 'y' or 'lat'.
method : {'uniform', 'gaussian', 'box'}, optional
Smoothing method. Default is 'gaussian'.
variables : list of str, optional
Variables to smooth. If None, smooths all variables that have both
spatial dimensions.
**kwargs : dict
Method-specific parameters passed to smoothing function.
Returns
-------
xr.Dataset
Dataset with smoothed spatial data. Variables without both spatial
dimensions are preserved unchanged. All attributes are preserved.
Raises
------
ValueError
If spatial dimensions are not found, specified variables don't exist,
or any variable to be smoothed contains NaN values.
Notes
-----
Only variables that contain both spatial dimensions are smoothed. Other
variables are copied unchanged to the output.
"""
# If no dimensions specified, let the first variable determine them
# This ensures consistent dimension finding across all variables
if dim1 is None or dim2 is None:
# Try to find dimensions from the Dataset's dims
# This is just for early validation and consistency
try:
found_dim1, found_dim2 = _findSpatialDims(
[ds.dims, ] if isinstance(ds.dims, str) else [str(d) for d in ds.dims])
dim1 = dim1 or found_dim1
dim2 = dim2 or found_dim2
except ValueError:
# If we can't find them at Dataset level, the DataArray function
# will try to find them or raise an appropriate error
pass
# Select variables to smooth
if variables is None:
# Get all data variables - let smooth2DDataArray handle dimension checking
vars_to_process = list(ds.data_vars)
else:
# Validate specified variables exist
missing = [v for v in variables if v not in ds.data_vars]
if missing:
raise ValueError(f"Variables not found in Dataset: {missing}")
vars_to_process = variables
# Create result dataset
result = ds.copy()
# Process each variable
for var in vars_to_process:
try:
# Try to smooth the variable
result[var] = smooth2D_DataArray(ds[var], dim1, dim2, method, **kwargs)
except ValueError as e:
# If it fails because of missing dimensions, just skip this variable
if "not found in DataArray" in str(e) or "Could not find spatial dimensions" in str(e):
# Variable doesn't have the required dimensions, keep original
if variables is not None and var in variables:
# User specifically requested this variable, so warn them
warnings.warn(
f"Variable '{var}' does not have required spatial dimensions, skipping.")
else:
# Some other error (like NaN values), re-raise
raise
return result
@overload
def smooth2D(data: xr.DataArray,
dim1: Optional[str] = ...,
dim2: Optional[str] = ...,
method: Literal['uniform', 'gaussian', 'box'] = ...,
variables: None = ...,
**kwargs) -> xr.DataArray:
...
@overload
def smooth2D(data: xr.Dataset,
dim1: Optional[str] = ...,
dim2: Optional[str] = ...,
method: Literal['uniform', 'gaussian', 'box'] = ...,
variables: Optional[list[str]] = ...,
**kwargs) -> xr.Dataset:
...
[docs]
def smooth2D(data: Union[xr.DataArray, xr.Dataset],
dim1: Optional[str] = None,
dim2: Optional[str] = None,
method: Literal['uniform', 'gaussian', 'box'] = 'gaussian',
variables: Optional[list[str]] = None,
**kwargs) -> Union[xr.DataArray, xr.Dataset]:
"""
Apply 2D spatial smoothing to data.
This function automatically selects the appropriate implementation based on the
input data type: DataArray or Dataset.
Parameters
----------
data : xr.DataArray or xr.Dataset
Input data with at least 2 spatial dimensions. Must not contain NaN values
in data to be smoothed.
dim1 : str, optional
First spatial dimension. If None, will try to find 'x' or 'lon'.
dim2 : str, optional
Second spatial dimension. If None, will try to find 'y' or 'lat'.
method : {'uniform', 'gaussian', 'box'}, optional
Smoothing method. Default is 'gaussian'.
variables : list of str, optional
For Dataset: Variables to smooth (default: all with both spatial dims).
For DataArray: Ignored.
**kwargs : dict
Method-specific parameters:
For 'uniform':
- size : int or tuple of int (default: 3)
Filter size in pixels.
For 'gaussian':
- sigma : float or tuple of float (default: 1.0)
Standard deviation of Gaussian kernel.
- truncate : float (default: 4.0)
Truncate filter at this many standard deviations.
For 'box':
- kernel_size : int or tuple of int (default: 3)
Size of box filter.
Returns
-------
xr.DataArray or xr.Dataset
Same type as input with spatially smoothed data.
Raises
------
TypeError
If data is not a DataArray or Dataset.
ValueError
If spatial dimensions are not found or data contains NaN values.
Examples
--------
Smooth a DataArray with Gaussian filter:
>>> da = xr.DataArray(data, dims=['time', 'lat', 'lon'])
>>> smoothed = smooth2D(da, method='gaussian', sigma=2.0)
Smooth specific variables in a Dataset:
>>> ds = xr.Dataset({'temp': da1, 'pressure': da2})
>>> smoothed = smooth2D(ds, variables=['temp'], method='uniform', size=5)
Use custom dimension names:
>>> smoothed = smooth2D(data, dim1='x_coord', dim2='y_coord')
"""
if isinstance(data, xr.DataArray):
if variables is not None:
warnings.warn("'variables' parameter is ignored for DataArray input")
return smooth2D_DataArray(data, dim1, dim2, method, **kwargs)
elif isinstance(data, xr.Dataset):
return smooth2D_Dataset(data, dim1, dim2, method, variables, **kwargs)
else:
raise TypeError(f"Input data must be xr DataArray or Dataset. "
f"Got {type(data).__name__}")
[docs]
def interpolateValues(points: np.ndarray,
points_crs: CRS | None,
data: xr.DataArray,
method: InterpOptions = 'nearest') -> np.ndarray:
"""
Interpolate values from a 2D grid-based DataArray at given x, y or lat, lon points.
Parameters
----------
points : np.ndarray
A (N, 2) array of coordinates. Each row should contain an (x, y) or (lon, lat) pair.
points_crs : CRS
A coordinate system for the points.
data : xr.DataArray
A DataArray with coordinates either ('x', 'y') or ('lon', 'lat').
method : {'linear', 'nearest', 'cubic'}
Interpolation method to use.
Returns
-------
np.ndarray
A 1D array of interpolated values with length N.
Raises
------
ValueError
If DataArray does not have suitable coordinates for interpolation.
"""
array_crs = watershed_workflow.crs.from_xarray(data)
if points_crs is not None and array_crs is not None:
x, y = watershed_workflow.warp.xy(points[:, 0], points[:, 1], points_crs, array_crs)
else:
x, y = points[:, 0], points[:, 1]
if { 'x', 'y'}.issubset(data.coords):
coord_names = ('x', 'y')
elif { 'lon', 'lat'}.issubset(data.coords):
coord_names = ('lon', 'lat')
else:
raise ValueError("DataArray must have coordinates ('x', 'y') or ('lon', 'lat')")
coords = xr.Dataset({coord_names[0]: ("points", x), coord_names[1]: ("points", y)})
interpolated = data.interp(coords, method=method)
return interpolated.as_numpy()
[docs]
def imputeHoles2D(arr: xr.DataArray, nodata: Any = np.nan, method: str = 'cubic') -> xr.DataArray:
"""
Interpolate values for missing data in rasters using scipy griddata.
Parameters
----------
arr : xarray.DataArray
Input raster data with missing values to interpolate.
nodata : Any, optional
Value representing missing data. Default is numpy.nan.
method : str, optional
Interpolation method for scipy.interpolate.griddata.
Valid options: 'linear', 'nearest', 'cubic'. Default is 'cubic'.
Returns
-------
numpy.ndarray
Interpolated array with missing values filled.
Notes
-----
This function may raise an error if there are holes on the boundary.
The interpolation is performed using scipy.interpolate.griddata with
the specified method.
"""
if nodata is np.nan:
mask = np.isnan(arr)
else:
mask = (arr == nodata)
x = np.arange(0, arr.shape[1])
y = np.arange(0, arr.shape[0])
xx, yy = np.meshgrid(x, y)
#get only the valid values
x1 = xx[~mask]
y1 = yy[~mask]
newarr = arr[~mask]
res = scipy.interpolate.griddata((x1, y1), newarr.ravel(), (xx, yy), method=method)
return res
[docs]
def rasterizeGeoDataFrame(gdf: gpd.GeoDataFrame,
column: str,
resolution: float,
bounds: Optional[Tuple[float, float, float, float]] = None,
nodata: Optional[Union[int, float]] = None) -> xr.DataArray:
"""
Convert a GeoDataFrame to a rasterized DataArray based on a column's values.
Parameters
----------
gdf : geopandas.GeoDataFrame
Input GeoDataFrame containing geometries and data.
column : str
Name of the column containing values to rasterize. Must be a numeric type.
resolution : float
Spatial resolution of the output raster in the units of the GeoDataFrame's CRS.
This defines the size of each pixel.
bounds : tuple of float, optional
Bounding box as (minx, miny, maxx, maxy). If None, bounds are computed
from the GeoDataFrame's total bounds.
nodata : int or float, optional
Value to use for pixels not covered by any geometry. If None, defaults
to NaN for float columns and -999 for integer columns.
Returns
-------
xarray.DataArray
Rasterized data with dimensions ('y', 'x') and coordinates defined by
the spatial extent and resolution. Areas outside geometries are set to
the nodata value. The data type matches the column's data type.
Raises
------
ValueError
If column is not found in the GeoDataFrame.
If column is not numeric type.
If GeoDataFrame is empty.
If resolution is not positive.
Notes
-----
The function uses rasterio's rasterization capabilities to burn geometries
into a raster. When geometries overlap, the value from the last geometry
in the GeoDataFrame is used.
The output DataArray includes the CRS information in its attributes if
the GeoDataFrame has a CRS defined.
The dtype of the output array matches the dtype of the input column.
"""
# reset the index to make 'index' a column if that is what is
# requested to be colored.
if column == 'index':
gdf = gdf.reset_index()
# Validate inputs
if column not in gdf.columns:
raise ValueError(f"Column '{column}' not found in GeoDataFrame")
if len(gdf) == 0:
raise ValueError("GeoDataFrame is empty")
if resolution <= 0:
raise ValueError(f"Resolution must be positive, got {resolution}")
# Determine output data type and fill value
out_dtype = gdf[column].dtype
# Set nodata value based on dtype if not provided
fill_value = out_dtype.type()
if nodata is None:
if np.issubdtype(out_dtype, np.integer):
fill_value = -999
elif np.issubdtype(out_dtype, np.floating):
fill_value = np.nan
else:
raise ValueError(f"Column '{column}' must be numeric type, got {out_dtype}")
else:
fill_value = out_dtype.type(nodata)
# Validate geometry types and create a mask of geometry that are
# valid and values that are not na
valid_types = (shapely.geometry.Polygon, shapely.geometry.MultiPolygon)
valid_mask = gdf[column].notna() & \
gdf.geometry.notna() & gdf.geometry.is_valid & \
gdf.geometry.apply(lambda s : isinstance(s, valid_types))
# Get bounds
if bounds is None:
bounds = tuple(gdf.total_bounds) # minx, miny, maxx, maxy
minx, miny, maxx, maxy = bounds
# Calculate raster dimensions
width = int(np.ceil((maxx-minx) / resolution))
height = int(np.ceil((maxy-miny) / resolution))
# Create transform
transform = rasterio.transform.from_bounds(minx, miny, maxx, maxy, width, height)
# Initialize array with nodata value
raster = np.full((height, width), fill_value, dtype=out_dtype)
if valid_mask.sum() == 0:
# No valid data to rasterize
pass
else:
# Create list of (geometry, value) pairs
shapes = [(geom, value)
for geom, value in zip(gdf.geometry[valid_mask], gdf[column][valid_mask])]
# Rasterize
rasterio.features.rasterize(shapes,
out=raster,
transform=transform,
fill=fill_value,
dtype=out_dtype)
# Create coordinate arrays
# X coordinates (cell centers)
x_coords = np.arange(minx + resolution/2, maxx + resolution, resolution)[:width]
# Y coordinates (cell centers) - note that y decreases as row index increases
y_coords = np.arange(maxy - resolution/2, miny - resolution, -resolution)[:height]
# Create DataArray
da = xr.DataArray(raster, dims=['y', 'x'], coords={ 'x': x_coords, 'y': y_coords }, name=column)
# Add attributes
da.attrs['resolution'] = resolution
da.attrs['source_column'] = column
da.attrs['nodata'] = fill_value
# Add CRS if available
if gdf.crs is not None:
da.attrs['crs'] = str(gdf.crs)
return da