Source code for watershed_workflow.sources.manager_nhd

from typing import List, Optional
from shapely.geometry.base import BaseGeometry
import geopandas as gpd
import pandas as pd

from watershed_workflow.crs import CRS
import watershed_workflow.crs

from . import standard_names as names
from . import manager_hyriver


waterdata_ids = {'nhdflowline_network' : 'comid',
                  'catchmentsp' : 'featureid',
                  }

waterdata_renames = {'gnis_name' : names.NAME,
                     'lengthkm' : names.LENGTH,
                     'areasqkm' : names.CATCHMENT_AREA,
                     'streamorde' : names.ORDER,
                     'totdasqkm' : names.DRAINAGE_AREA,
                     'geometry_ca' : names.CATCHMENT,
                     'hydroseq' : names.HYDROSEQ,
                     'uphydroseq' : names.UPSTREAM_HYDROSEQ,
                     'dnhydroseq' : names.DOWNSTREAM_HYDROSEQ,
                     'divergence' : names.DIVERGENCE,
                     }

hr_ids = {'flowline' : 'nhdplusid',
          'catchment' : 'nhdplusid',
          }
hr_renames = { 'gnis_name' : names.NAME,
               'lengthkm' : names.LENGTH,
               'areasqkm' : names.CATCHMENT_AREA,
               'streamorde' : names.ORDER,
               'totdasqkm' : names.DRAINAGE_AREA,
               'geometry_ca' : names.CATCHMENT,
               'hydroseq' : names.HYDROSEQ,
               'uphydroseq' : names.UPSTREAM_HYDROSEQ,
               'dnhydroseq' : names.DOWNSTREAM_HYDROSEQ,
               'divergence' : names.DIVERGENCE,
              }

mr_ids = {'flowline_mr' : 'COMID', }
mr_renames = { 'GNIS_NAME' : names.NAME,
               'LENGTHKM' : names.LENGTH,
               'AreaSqKM' : names.CATCHMENT_AREA,
               'StreamOrde' : names.ORDER,
               'TotDASqKM' : names.DRAINAGE_AREA,
               'Hydroseq' : names.HYDROSEQ,
               'UpHydroseq' : names.UPSTREAM_HYDROSEQ,
               'DnHydroseq' : names.DOWNSTREAM_HYDROSEQ,
               'Divergence' : names.DIVERGENCE,
              }


def _tryRename(df, old, new):
    try:
        df[new] = df.pop(old)
        return new
    except KeyError:
        return None


[docs] class ManagerNHD(manager_hyriver.ManagerHyRiver): """Leverages pynhd to download NHD data and its supporting shapes.""" lowest_level = 12 def __init__(self, dataset_name: str, layer: Optional[str] = None, catchments: Optional[bool] = True, fewer_columns : Optional[bool] = True, ): """Initialize NHD manager. Parameters ---------- dataset_name : str NHD dataset name ('NHDPlus MR v2.1', 'NHDPlus HR', 'NHD MR'). layer : str, optional Layer name, defaults to protocol-specific default. catchments : bool, optional Whether to fetch catchments with flowlines, defaults to True. fewer_columns : bool, optional Whether to remove some of the QA/QC columns from the dataframe, defaults to True. """ self._catchment_layer = None self._fewer_columns = fewer_columns if dataset_name == 'NHDPlus MR v2.1': self._protocol_name = 'WaterData' self._ids = waterdata_ids self._renames = waterdata_renames if layer is None: layer = 'nhdflowline_network' if layer == 'nhdflowline_network': self._catchment_layer = 'catchmentsp' elif dataset_name == 'NHDPlus HR': self._protocol_name = 'NHDPlusHR' self._ids = hr_ids self._renames = hr_renames if layer is None: layer = 'flowline' if layer == 'flowline': self._catchment_layer = 'catchment' elif dataset_name == 'NHD MR': self._protocol_name = 'NHD' self._ids = mr_ids self._renames = mr_renames if layer is None: layer = 'flowline_mr' else: raise ValueError(f'Invalid ManagerNHD dataset_name {dataset_name}') # NHD data is typically in lat/lon coordinates native_crs_in = watershed_workflow.crs.latlon_crs # Rough resolution estimate for degree-based data native_resolution = 0.001 # ~100m at mid-latitudes # Get ID name for this layer if layer in self._ids: id_name = self._ids[layer] else: id_name = layer super().__init__(self._protocol_name, native_crs_in, native_resolution, layer, id_name) self.name = dataset_name self._catchments = catchments
[docs] def getCatchments(self, df : gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Add catchment data to flowline data. Parameters ---------- df : gpd.GeoDataFrame GeoDataFrame with flowline data and ID column. Returns ------- gpd.GeoDataFrame GeoDataFrame with catchment data merged in. """ if self._catchment_layer is not None: # Save current layer and switch to catchment layer old_layer = self._layer old_id_name = self._id_name # Set catchment layer properties self._layer = self._catchment_layer if self._catchment_layer in self._ids: self._id_name = self._ids[self._catchment_layer] else: self._id_name = self._catchment_layer # Get catchments using HyRiver directly (no recursive catchment fetching) ids = df[old_id_name].tolist() cas_raw = manager_hyriver.ManagerHyRiver._getShapesByID(self, ids) # Apply standard names to catchments cas_raw = self._addStandardNames(cas_raw, False) # Merge catchments with flowlines df = pd.merge(df, cas_raw, how='outer', left_on=old_id_name, right_on=self._id_name, suffixes=(None, '_ca')) # Restore original layer properties self._layer = old_layer self._id_name = old_id_name return df
def _addStandardNames(self, df: gpd.GeoDataFrame, fewer_columns : Optional[bool] = None, ) -> gpd.GeoDataFrame: """Convert native column names to standard names. Parameters ---------- df : gpd.GeoDataFrame GeoDataFrame with native column names. Returns ------- gpd.GeoDataFrame GeoDataFrame with standard column names added. """ # Add ID column from native ID field if self.native_id_field in df.columns: df[names.ID] = df[self.native_id_field].astype('string') # Add other standard name mappings renames = [] for k, v in self._renames.items(): res = _tryRename(df, k, v) if res is not None: renames.append(res) # remove QA/QC codes if fewer_columns is None: fewer_columns = self._fewer_columns if fewer_columns: renames.extend(['geometry', names.ID, self.native_id_field]) df = df[renames] return df def _getShapesByGeometry(self, geometry_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Fetch NHD shapes by geometry, including catchments if requested. Parameters ---------- geometry_gdf : gpd.GeoDataFrame GeoDataFrame with geometries in native_crs_in to search for shapes. Returns ------- gpd.GeoDataFrame Raw GeoDataFrame with native column names and catchments if requested. """ # Get base shapes from HyRiver df = super()._getShapesByGeometry(geometry_gdf) # Add catchments if requested if self._catchments: df = self.getCatchments(df) return df def _getShapesByID(self, ids) -> gpd.GeoDataFrame: """Fetch NHD shapes by ID, including catchments if requested. Parameters ---------- ids : List[str] List of IDs to retrieve. Returns ------- gpd.GeoDataFrame Raw GeoDataFrame with native column names and catchments if requested. """ # Get base shapes from HyRiver df = super()._getShapesByID(ids) # Add catchments if requested if self._catchments: df = self.getCatchments(df) return df