"""Module for working with tree data structures, built on watershed_workflow.tinytree
Note that this class knows how to work with the following properties,
which are expected to be spelled this way if they exist. Only index
and geometry MUST exist.
index
Must be the index of the DataFrame
geometry : shapely.LineString
the river reach line
catchment : shapely.Polygon
the local contributing area to this reach
area : double
area [m^2] of catchment, the local contributing area
hydroseq : int
See documentation for NHDPlus
dnhydroseq : int
See documentation for NHDPlus
"""
from __future__ import annotations
from typing import List, Optional, Any, Tuple, Callable, Literal, Generator
import logging
import numpy as np
import copy
from scipy.spatial import cKDTree
from matplotlib import pyplot as plt
import itertools
import sortedcontainers
import folium
import folium.plugins
import shapely.geometry
import shapely.ops
import geopandas as gpd
import pandas as pd
import pandas.api.types
import watershed_workflow.utils
import watershed_workflow.tinytree
from watershed_workflow.crs import CRS
import watershed_workflow.plot
import watershed_workflow.sources.standard_names as names
_tol = 1.e-7
class _RowView:
"""A helper class, this is what is returned in a call to node.properties.
This behaves like a dictionary and represents a reference/view to
the row of the pandas DataFrame that stores the underlying data in
the tree. We can't just return a row is because of pandas
Copy-on-Write mechanism -- we really want a reference, not a copy.
"""
def __init__(self, df: gpd.GeoDataFrame, index: int | str):
self._df = df
self._index = index
def __len__(self) -> int:
return len(self._df.keys())
def __getitem__(self, k: str) -> Any:
if k == 'index':
return self._index
else:
return self._df.at[self._index, k]
def __setitem__(self, k: str, v: Any) -> None:
self._df.loc[self._index, k] = v
def __iter__(self) -> Any:
return self._df.keys()
def __contains__(self, k: str) -> bool:
return k in self._df.keys()
def keys(self) -> Any:
return self._df.keys()
def __repr__(self) -> str:
return repr(self._df.loc[self._index])
class _MySortedList(sortedcontainers.SortedKeyList):
def __init__(self, iterable=None):
# note, SortedKeyList requires a total ordering. Since angle
# can be equal for two different linestrings (if they share
# the same first segment angle), we also order (secondarily)
# by id to make an arbitrary but now total ordering of nodes.
return super(_MySortedList, self).__init__(iterable,
key=lambda node: (node.angle, id(node)))
[docs]
class River(watershed_workflow.tinytree.Tree):
"""A tree structure whose node data is stored in a pandas DataFrame, accessed by an index."""
ListType = _MySortedList
def __init__(self,
index: int | str,
df: gpd.GeoDataFrame,
children: Optional[List[River]] = None):
"""Do not call me. Instead use the class factory methods, one of:
- construct_rivers_by_geometry() # generic data
- construct_rivers_by_hydroseq() # NHDPlus data
This method initializes a single node in the River,
representing one reach and its upstream children.
"""
self.index = index
self.df = df
super(River, self).__init__(children)
def __iter__(self):
return self.preOrder()
[docs]
def addChild(self, child_or_index: River | str | int) -> River:
"""Append a child (upstream) reach to this reach."""
if isinstance(child_or_index, River):
super(River, self).addChild(child_or_index)
else:
super(River, self).addChild(type(self)(child_or_index, self.df))
return self.children[-1]
@property
def linestring(self) -> shapely.geometry.LineString:
"""Returns the linestring geometry."""
return self.df.at[self.index, 'geometry']
@linestring.setter
def linestring(self, value: shapely.geometry.LineString):
self.df.loc[self.index, 'geometry'] = value
@property
def crs(self) -> CRS:
return self.df.crs
@property
def properties(self) -> _RowView:
return _RowView(self.df, self.index)
@properties.setter
def properties(self, value) -> None:
self.df.loc[self.index, value.keys()] = tuple(value.values())
def __getitem__(self, name: str) -> Any:
"""Faster/preferred getter for properties"""
if name == 'index':
return self.index
else:
return self.df.at[self.index, name]
def __setitem__(self, name: str, value: Any) -> None:
"""Faster/preferred setter for properties"""
self.df.loc[self.index, name] = value
def __contains__(self, name: str) -> bool:
return self.df.__contains__(name)
@property
def angle(self) -> float:
"""Returns the angle, in radians, from node.parent.linestring to node.linestring, in a clockwise sense."""
if self.parent is None:
return 0.0
else:
return watershed_workflow.utils.computeAngle(self.parent.linestring, self.linestring)
[docs]
def plot(self, *args, **kwargs):
"""Plot the rivers.
Parameters
----------
*args
Positional arguments passed to watershed_workflow.plot.linestringsWithCoords.
**kwargs
Keyword arguments passed to watershed_workflow.plot.linestringsWithCoords.
See that function for available parameters.
Returns
-------
matplotlib figure or axes
The plotting result from linestringsWithCoords.
"""
inds = [r.index for r in self]
return watershed_workflow.plot.linestringsWithCoords(self.df.loc[inds], *args, **kwargs)
[docs]
def explore(self, column=names.ID, m=None, marker=None, name=None, **kwargs):
"""Open an interactive map using Folium.
Parameters
----------
column : str, optional
Column name to use for coloring/styling the rivers. Defaults to the ID column.
m : folium.Map, optional
Existing Folium map to add rivers to. If None, creates a new map.
marker : bool, optional
Whether to add coordinate markers for each vertex. Defaults to None (no markers).
name : str, optional
Name for the layer in the map. If None, attempts to use NAME or ID property.
**kwargs
Keyword arguments passed to geopandas.GeoDataFrame.explore().
See that function for available parameters like color, cmap, tooltip, popup, etc.
Returns
-------
folium.Map
Interactive map with the river network displayed.
"""
# get a name
if column == names.ID and names.ID not in self.df:
self.df[names.ID + "_as_column"] = self.df.index.astype('string')
column = names.ID + "_as_column"
if name is None:
try:
name = self[names.NAME]
except KeyError:
try:
name = self[names.ID]
except KeyError:
name = self.index
kwargs.setdefault('tooltip', False)
default_props = [
pname for pname in [
names.NAME, 'gnis_name', 'ftype', names.ORDER, names.DIVERGENCE, names.HYDROSEQ,
names.DOWNSTREAM_HYDROSEQ, names.UPSTREAM_HYDROSEQ, names.LENGTH,
names.CATCHMENT_AREA,
] if pname in self.df
]
for p in self.df.keys():
if len(default_props) >= 8:
break
if p not in default_props and p != 'geometry':
default_props.append(p)
kwargs.setdefault('popup', [names.ID, ] + default_props)
if 'color' not in kwargs:
kwargs.setdefault('cmap', watershed_workflow.colors.xkcd_bolds)
kwargs.setdefault('legend', False)
# style
style_kwds = kwargs.setdefault('style_kwds', dict())
style_kwds.setdefault('weight', 5)
# default explore
m = self.df.explore(column=column, m=m, name=name, **kwargs)
if marker:
# don't reuse -- some versions keep the various *_kwds
# dictionaries by reference
kwargs2 = copy.deepcopy(kwargs)
# explore the coordinates too!
marker_kwds = kwargs2.setdefault('marker_kwds', dict())
marker_kwds.setdefault('radius', 8)
kwargs2['style_kwds']['fillOpacity'] = 1
if names.ID in self.df:
new_id_name = names.ID + '-copy'
else:
new_id_name = names.ID
marker_df = self.df.set_geometry([shapely.geometry.MultiPoint(ls.coords) for ls in self.df.geometry]) \
.explode(index_parts=True).reset_index(names=[new_id_name, 'coord'])
for disp_mode in ['tooltip', 'popup']:
if disp_mode in kwargs2 and isinstance(kwargs2[disp_mode], list):
if names.ID in kwargs2[disp_mode]:
kwargs2[disp_mode].remove(names.ID)
kwargs2[disp_mode].insert(0, 'coord')
kwargs2[disp_mode].insert(0, names.ID)
m = marker_df.explore(column=column,
m=m,
name=self[names.NAME] + ' coordinates',
**kwargs2)
return m
#
# methods that act on topology and geometry -- high level API
#
[docs]
def split(self, i: int) -> Tuple[River, River]:
"""Split the reach at the ith coordinate of the linestring.
Note that this does not split the catchment!
self becomes the downstream node, and is modified in-place to
preserve the full tree if the trunk is the one being split.
Returns upstream_node, downstream_node.
"""
if i < 0:
i = len(self.linestring.coords) + i
assert (i > 0 and i < len(self.linestring.coords) - 1)
linestring = self.linestring
upstream_linestring = shapely.geometry.LineString(list(linestring.coords)[0:i + 1])
downstream_linestring = shapely.geometry.LineString(list(linestring.coords)[i:])
upstream_area_frac = upstream_linestring.length / linestring.length
# fix properties
upstream_props = dict(self.properties)
if names.CATCHMENT_AREA in upstream_props:
upstream_props[names.CATCHMENT_AREA] = self[names.CATCHMENT_AREA] * upstream_area_frac
self[names.CATCHMENT_AREA] = self[names.CATCHMENT_AREA] * (1-upstream_area_frac)
if names.DRAINAGE_AREA in upstream_props:
if names.CATCHMENT_AREA in self.properties:
upstream_props[
names.DRAINAGE_AREA] = self[names.DRAINAGE_AREA] - self[names.CATCHMENT_AREA]
if names.HYDROSEQ in upstream_props:
upstream_props[names.HYDROSEQ] = (self[names.HYDROSEQ]
+ self.parent[names.HYDROSEQ]) / 2.0
if names.UPSTREAM_HYDROSEQ in upstream_props:
self[names.UPSTREAM_HYDROSEQ] = upstream_props[names.HYDROSEQ]
if names.DOWNSTREAM_HYDROSEQ in upstream_props:
upstream_props[names.DOWNSTREAM_HYDROSEQ] = self[names.HYDROSEQ]
if names.ID in self.properties:
ID = self[names.ID]
upstream_props[names.ID] = str(ID) + 'a'
self[names.ID] = str(ID) + 'b'
if names.DIVERGENCE in upstream_props:
self[names.DIVERGENCE] = 0
if names.CATCHMENT in upstream_props:
upstream_props[names.CATCHMENT] = shapely.geometry.Point()
upstream_props['geometry'] = upstream_linestring
# new node
# -- create a valid index and add the row to the dataframe
if pandas.api.types.is_integer_dtype(self.df.index.dtype):
new_index = pd.Series(max(self.df.index) + 1).astype(self.df.index.dtype)[0]
elif isinstance(self.index, str):
if self.index[-1].isalpha():
new_index = self.index[:-1] + chr(ord(self.index[-1]) + 1)
else:
new_index = str(self.index) + 'a'
else:
new_index = str(self.index) + 'a'
assert new_index not in self.df.index
crs = self.df.crs
self.df.loc[new_index] = upstream_props
# clean up -- see geopandas/geopandas#3119
# -- geometry seems to be correct, but catchment is not, and the crs gets dropped
if names.CATCHMENT in self.df:
self.df[names.CATCHMENT] = self.df[names.CATCHMENT].astype(gpd.array.GeometryDtype())
if crs is not None:
self.df.set_crs(crs, inplace=True)
# end clean up -- hopefully this gets fixed sometime in upstream (pandas or geopandas)
# -- construct the new upstream node inject it into the tree
new_node = self.__class__(new_index, self.df)
self.giveChildren(new_node)
self.linestring = downstream_linestring
self.addChild(new_node)
return new_node, self
[docs]
def splitAtArclen(self, s: float) -> Tuple[River, River]:
"""Inserts a coordinate at arclen s, then splits at that coordinate.
Parameters
----------
s : float
Arc length distance from the downstream end of the reach at which
to split the reach. Must be between 0 and the total reach length.
Returns
-------
Tuple[River, River]
Tuple of (upstream_node, downstream_node) after splitting.
"""
i = self.insertCoordinateByArclen(s)
return self.split(i)
[docs]
def merge(self, merge_reach: bool = True) -> None:
"""Merges this node with its parent.
Parameters
----------
merge_reach : bool, optional
Whether to merge the linestring geometries. If True, combines the
linestrings. If False, only merges properties. Defaults to True.
"""
parent = self.parent
if merge_reach:
assert (len(list(self.siblings)) == 0)
new_seg = shapely.geometry.LineString(
list(self.linestring.coords)[0:-1] + list(parent.linestring.coords))
parent.linestring = new_seg
# fix properties
if names.CATCHMENT_AREA in self:
parent[names.CATCHMENT_AREA] += self[names.CATCHMENT_AREA]
if names.CATCHMENT in self and self[names.CATCHMENT] is not None:
if parent[names.CATCHMENT] is None:
parent[names.CATCHMENT] = self[names.CATCHMENT]
else:
parent[names.CATCHMENT] = shapely.ops.unary_union(
[self[names.CATCHMENT], parent[names.CATCHMENT]])
if names.DIVERGENCE in self:
parent[names.DIVERGENCE] = self[names.DIVERGENCE]
# set topology
self.remove()
self.linestring = shapely.geometry.LineString()
for child in self.children:
parent.addChild(child)
[docs]
def prune(self) -> None:
"""Removes this node and all below it, merging properties.
This method removes the entire subtree rooted at this node, but first
merges all properties (like catchment areas) up to the parent node.
Raises
------
ValueError
If called on a node with no parent (cannot prune the root).
"""
if self.parent is None:
raise ValueError("Cannot prune a branch with no parent.")
for node in self.postOrder():
node.merge(False)
#
# methods that act on coordinates only
#
[docs]
def moveCoordinate(self, i: int, xy: Tuple[float, float] | Tuple[float, float, float]) -> None:
"""Moves the ith coordinate of self.linestring to a new location."""
if i < 0:
i = len(self.linestring.coords) + i
coords = list(self.linestring.coords)
coords[i] = xy
self.linestring = shapely.geometry.LineString(coords)
[docs]
def insertCoordinate(self, i: int, xy: Tuple[float, float]) -> int:
"""If it doesn't already exist, inserts a new coordinate before the ith coordinate.
Returns the index of the new (or preexisting) coordinate.
"""
if i < 0:
i = len(self.linestring.coords) + i
coords = list(self.linestring.coords)
if watershed_workflow.utils.isClose(xy, coords[i - 1]):
# don't insert and existing point
return i - 1
elif i < len(coords) and watershed_workflow.utils.isClose(xy, coords[i]):
return i
else:
coords.insert(i, xy)
self.linestring = shapely.geometry.LineString(coords)
return i
[docs]
def insertCoordinateByArclen(self, s: float) -> int:
"""Inserts a new coordinate at a given arclen, returning the index of that coordinate.
Parameters
----------
s : float
Arc length distance from the downstream end of the reach at which
to insert the new coordinate. Must be between 0 and the total
reach length.
Returns
-------
int
Index of the newly inserted coordinate in the linestring.
Note
----
Arc length is measured from the downstream end of the reach.
"""
sp = self.linestring.length - s
coords = np.array(self.linestring.coords)
dcoords = coords[1:] - coords[:-1]
ds = np.linalg.norm(dcoords, axis=1)
point_arclens = np.cumsum(ds)
i = np.where(point_arclens > sp)[0][0]
p = self.linestring.interpolate(sp)
return self.insertCoordinate(i + 1, p)
[docs]
def appendCoordinate(self, xy: Tuple[float, float]) -> None:
"""Appends a coordinate at the end (downstream) of the linestring."""
coords = list(self.linestring.coords) + [xy, ]
self.linestring = shapely.geometry.LineString(coords)
[docs]
def extendCoordinates(self, xys: List[Tuple[float, float]]) -> None:
"""Appends multiple coordinates at the end (downstream) of the linestring."""
coords = list(self.linestring.coords) + xys
self.linestring = shapely.geometry.LineString(coords)
[docs]
def prependCoordinates(self, xys: List[Tuple[float, float]]) -> None:
"""Prepends multiple coordinates at the beginning (upstream) of the linestring."""
coords = xys + list(self.linestring.coords)
self.linestring = shapely.geometry.LineString(coords)
[docs]
def popCoordinate(self, i: int) -> Tuple[float, float]:
"""Removes the ith coordinate and returns its value."""
coords = list(self.linestring.coords)
c = coords.pop(i)
self.linestring = shapely.geometry.LineString(coords)
return c
#
# Methods that act on the network and its properties
#
[docs]
def accumulate(self, to_accumulate: str, to_save: Optional[str] = None, op: Callable = sum):
"""Accumulates a property across the river tree.
Parameters
----------
to_accumulate : str
Name of the property to accumulate from child nodes.
to_save : str, optional
Name of the property to store the accumulated result in.
If None, the result is not saved to the node.
op : Callable, optional
Operation to use for accumulation. Defaults to sum.
Returns
-------
Any
The accumulated value for this node and all its children.
"""
val = op(child.accumulate(to_accumulate, to_save, op) for child in self.children)
val = op([val, self[to_accumulate]])
if to_save is not None:
self[to_save] = val
return val
[docs]
def getNode(self, index: int | str) -> River | None:
"""return node for a given index"""
try:
node = next(node for node in self if node.index == index)
except StopIteration:
node = None
return node
[docs]
def findNode(self, lambd: Callable) -> River | None:
"""Find a node, returning the first whose lambda application is true, or None"""
try:
return next(n for n in self.preOrder() if lambd(n))
except StopIteration:
return None
[docs]
def assignOrder(self) -> None:
"""Working from leaves to trunk, assign stream order property.
This method assigns stream order values to all reaches in the river network
following the Strahler stream ordering system. Orders are calculated from
leaf nodes (order 1) toward the trunk, where confluences of streams of
equal order increment the order by 1.
"""
self.df[names.ORDER] = -1
for leaf in self.leaf_nodes:
leaf[names.ORDER] = 1
node = leaf
while node.parent[names.ORDER] == -1 and all(c[names.ORDER] > 0 for c in node.siblings):
node = node.parent
order = max(c[names.ORDER] for c in node.children)
if len(node.children) > 1:
order += 1
node[names.ORDER] = order
def _isContinuous(self, child, tol: float = _tol) -> bool:
"""Is a given child continuous with self?"""
return watershed_workflow.utils.isClose(child.linestring.coords[-1],
self.linestring.coords[0], tol)
[docs]
def isLocallyContinuous(self, tol: float = _tol) -> bool:
"""Is this node continuous with its parent and children?"""
res = all(self._isContinuous(child, tol=_tol) for child in self.children)
if self.parent is not None:
res = res and self.parent._isContinuous(self, tol=_tol)
return res
[docs]
def isContinuous(self, tol: float = _tol) -> bool:
"""Checks geometric continuity of the river.
Confirms that all upstream children's downstream coordinate
coincides with self's upstream coordinate.
"""
return all(self._isContinuous(child, tol) for child in self.children) and \
all(child.isContinuous(tol) for child in self.children)
def _makeContinuous(self, child: River) -> None:
child_coords = list(child.linestring.coords)
child_coords[-1] = list(self.linestring.coords)[0]
child.linestring = shapely.geometry.LineString(child_coords)
[docs]
def makeContinuous(self, tol: float = _tol) -> None:
"""Sometimes there can be small gaps between linestrings of river
tree if river is constructed using hydroseq and Snap
option is not used. Here we make them consistent.
"""
for node in self:
for child in node.children:
if not node._isContinuous(child, tol):
node._makeContinuous(child)
assert (self.isContinuous())
[docs]
def isLocallyMonotonic(self) -> bool:
"""Checks for monotonically decreasing elevation as we march downstream in this reach."""
coords = np.array(self.linestring.coords)
if max(coords[1:, 2] - coords[:-1, 2]) > 0:
return False
for child in self.children:
if self.linestring.coords[0][2] > child.linestring.coords[-1][2]:
return False
return True
def isMonotonic(self, known_depressions=None) -> bool:
if known_depressions is None:
known_depressions = []
return all(reach.isLocallyMonotonic() for reach in self
if reach.index not in known_depressions)
[docs]
def isHydroseqConsistent(self) -> bool:
"""Confirms that hydrosequence is valid."""
if len(self.children) == 0:
return True
self.children = sorted(self.children, key=lambda c: c[names.HYDROSEQ])
return self[names.HYDROSEQ] < self.children[0][names.HYDROSEQ] and \
all(child.isHydroseqConsistent() for child in self.children)
[docs]
def isConsistent(self, tol: float = _tol) -> bool:
"""Validity checking of the tree."""
good = self.isContinuous(tol)
if names.HYDROSEQ in self:
good &= self.isHydroseqConsistent()
return good
[docs]
def pathToRoot(self) -> Generator:
"""A generator for the nodes on the path to root, including this."""
yield self
if self.parent is not None:
for n in self.parent.pathToRoot():
yield n
[docs]
def resetDataFrame(self, force=False) -> None:
"""Resets the data frame for the river rooted at self, and
reindexes the tree to a simple integer-based, preOrdered
indexing.
This restricts the (shared) DataFrame to a subset of rows that
are all in the river rooted at self.
"""
# this should only be called on a root!
if self.parent is not None:
raise ValueError("Only call resetDataFrame on a root of the river tree!")
# collect the indices in this river
ids = [reach.index for reach in self]
# subset the dataframe to just this river
new_df = self.df.loc[ids]
# reindex to a preOrdered listing
# -- save the old index as ID
if names.ID not in new_df.columns:
new_df[names.ID] = new_df.index
# -- create a new index
assert 'new_preorder_index' not in new_df.columns
new_df['new_preorder_index'] = -np.ones((len(new_df), ), 'int64')
new_preorder_indices = dict((n.index, i) for (i, n) in enumerate(self))
new_df.loc[list(new_preorder_indices.keys()),
'new_preorder_index'] = pd.Series(new_preorder_indices)
# -- assign the new index as the index, then sort by this index
new_df = new_df.set_index('new_preorder_index', drop=True).sort_index()
# -- pass out to all reaches to make sure all have a reference
# to the same dataframe, and update the index of the reach
for i, reach in enumerate(self):
reach.index = i
reach.df = new_df
#
# methods that convert this to another object
#
[docs]
def to_crs(self, crs: CRS) -> None:
"""Warp the coordinate system."""
self.df.to_crs(crs, inplace=True)
[docs]
def to_dataframe(self) -> gpd.GeoDataFrame:
"""Represent as GeoDataFrame, useful for pickling."""
# reset the dataframe to be tight on this tree, and we can
# rely on rows being in preOrder
self.resetDataFrame()
# move the parent into the dataframe
def _parent_index(n):
if n.parent is None:
return None
else:
return n.parent[names.ID]
self.df[names.PARENT] = [_parent_index(n) for n in self]
self.df[names.PARENT] = self.df[names.PARENT].convert_dtypes()
# move the children into the dataframe
self.df[names.CHILDREN] = [[c.index for c in n.children] for n in self]
self.df[names.CHILDREN] = self.df[names.CHILDREN].convert_dtypes()
return self.df
[docs]
def to_mls(self) -> shapely.geometry.MultiLineString:
"""Represent this as a shapely.geometry.MultiLineString"""
return shapely.geometry.MultiLineString([r.linestring for r in self])
[docs]
def to_file(self, filename: str, **kwargs) -> None:
"""Save the network for this river only to a geopandas file.
Note this file can be reloaded via:
$> watershed_workflow.river_tree.River.constructRiversByDataFrame(gpd.read_file(filename))
"""
self.to_dataframe().to_file(filename, **kwargs)
[docs]
def copy(self, df: gpd.GeoDataFrame) -> River:
"""Shallow copy using a provided DataFrame"""
if df is None:
df = self.df
copy_children = [child.copy(df) for child in self.children]
return self.__class__(self.index, df, copy_children)
[docs]
def deepcopy(self) -> River:
"""Creates a deep copy of self"""
df_copy = self.df.copy()
return self.copy(df_copy)
[docs]
def copySubtree(self) -> River:
"""Returns a deep copy rooted at self."""
inds = [r.index for r in self.preOrder()]
df_copy = self.df.loc[inds]
return self.copy(df_copy)
#
# Factory functions
#
[docs]
@classmethod
def constructRiversByGeometry(cls, df, tol: float = _tol) -> List[River]:
"""Forms a list of River trees from a list of reaches by looking for
close endpoints of those reaches.
Parameters
----------
df : gpd.GeoDataFrame
GeoDataFrame containing reach linestrings. Must have a 'geometry'
column with LineString geometries.
tol : float, optional
Geometric tolerance for matching reach endpoints to beginpoints.
Defaults to _tol (1e-7).
Returns
-------
list[River]
List of River objects, each representing a river network tree.
Note
----
This expects that endpoints of a reach coincide with beginpoints of
their downstream reach, and does not work for cases where the junction
is at a midpoint of a reach.
"""
logging.debug("Generating Rivers")
if len(df) == 0:
return list()
# make a kdtree of beginpoints
coords = np.array([r.coords[0] for r in df['geometry']])
kdtree = cKDTree(coords)
# make a node for each linestring
nodes = [cls(i, df) for i in df.index]
# match nodes to their parent through the kdtree
rivers = []
divergence = []
divergence_matches = []
for j, n in enumerate(nodes):
# find the closest beginpoint the this node's endpoint
closest = kdtree.query_ball_point(n.linestring.coords[-1], tol)
if len(closest) > 1:
logging.debug("Bad multi linestring:")
logging.debug(" connected to %d: %r" % (j, list(n.linestring.coords[-1])))
divergence.append(j)
divergence_matches.append(closest)
# end at the same point, pick the min angle deviation
my_tan = np.array(n.linestring.coords[-1]) - np.array(n.linestring.coords[-2])
my_tan = my_tan / np.linalg.norm(my_tan)
other_tans = [
np.array(df.geometry[c].coords[1]) - np.array(df.geometry[c].coords[0])
for c in closest
]
other_tans = [ot / np.linalg.norm(ot) for ot in other_tans]
dots = [np.inner(ot, my_tan) for ot in other_tans]
for i, c in enumerate(closest):
logging.debug(" %d: %r --> %r with dot product = %g" %
(c, coords[c], df.geometry[c].coords[-1], dots[i]))
c = closest[np.argmax(dots)]
nodes[c].addChild(n)
elif len(closest) == 0:
rivers.append(n)
else:
assert (len(closest) == 1)
nodes[closest[0]].addChild(n)
assert (len(rivers) > 0)
return rivers
[docs]
@classmethod
def constructRiversByHydroseq(cls, df) -> List[River]:
"""Given a list of linestrings, create a list of rivers using the
HydroSeq maps provided in NHDPlus datasets.
Parameters
----------
df : gpd.GeoDataFrame
GeoDataFrame containing reach linestrings with NHDPlus attributes.
Must contain columns for HYDROSEQ and DOWNSTREAM_HYDROSEQ as defined
in watershed_workflow.sources.standard_names.
Returns
-------
list[River]
List of River objects, each representing a river network tree.
"""
# create a map from hydroseq to node
hydro_seq_ids = dict(zip(df[names.HYDROSEQ], (cls(i, df) for i in df.index)))
roots = []
for hs_id, node in hydro_seq_ids.items():
down_hs_id = node[names.DOWNSTREAM_HYDROSEQ]
try:
hydro_seq_ids[down_hs_id].addChild(node)
except KeyError:
roots.append(node)
return roots
[docs]
@classmethod
def constructRiversByDataFrame(cls, df) -> List[River]:
"""Create a list of rivers from a dataframe that includes a 'parent' column.
Parameters
----------
df : gpd.GeoDataFrame
GeoDataFrame containing reach linestrings with parent-child relationships.
Must contain PARENT and ID columns as defined in standard_names.
Returns
-------
list[River]
List of River objects, each representing a river network tree.
"""
assert names.PARENT in df
assert names.ID in df
# create a dictionary from ID --> node (NOT index --> node!)
nodes = dict((df.at[index, names.ID], cls(index, df)) for index in df.index)
roots = []
for ID, node in nodes.items():
parent = df.at[node.index, names.PARENT]
if pd.isna(parent):
roots.append(node)
else:
nodes[parent].addChild(node)
return roots
#
# Helper functions
#
[docs]
def getNode(rivers, index) -> Optional[River]:
"""Finds the node, by index, in a list of rivers.
Parameters
----------
rivers : list[River]
List of River objects to search through.
index : int or str
Index of the node to find.
Returns
-------
River or None
The River node with the specified index, or None if not found.
"""
for river in rivers:
n = river.getNode(index)
if n is not None:
return n
return None
[docs]
def combineSiblings(n1: River,
n2: River,
new_ls: Optional[shapely.geometry.LineString] = None,
ds: Optional[float] = None) -> River:
"""Combines two sibling nodes, merging catchments and metadata.
Parameters
----------
n1 : River
First sibling node to combine.
n2 : River
Second sibling node to combine.
new_ls : shapely.geometry.LineString, optional
Linestring geometry for the combined reach. If None, the geometry
is computed by interpolating discrete nodes every ds meters.
ds : float, optional
Distance between interpolated points when computing new geometry.
Required if new_ls is None.
Returns
-------
River
The combined river node (n1 is modified and returned).
Note
----
The resulting reach is either provided (by new_ls) or is
computed by interpolating discrete nodes every ds.
"""
assert (n1.isSiblingOf(n2))
if new_ls is None:
assert ds is not None
avg_length = (n1.linestring.length + n2.linestring.length) / 2
npoints = int(avg_length // ds) + 2
ds1 = np.linspace(0, n1.linestring.length, npoints)
points1 = n1.linestring.interpolate(ds1)
ds2 = np.linspace(0, n2.linestring.length, npoints)
points2 = n2.linestring.interpolate(ds2)
points = [
watershed_workflow.utils.computeMidpoint(p1.coords[0], p2.coords[0])
for (p1, p2) in zip(points1, points2)
]
new_ls = shapely.geometry.LineString(points)
n1.linestring = new_ls
if names.CATCHMENT_AREA in n1:
n1[names.CATCHMENT_AREA] += n2[names.CATCHMENT_AREA]
if 'catchment' in n2 and n2['catchment'] is not None:
if n1['catchment'] is None:
n1['catchment'] = n2['catchment']
else:
n1['catchment'] = shapely.ops.unary_union([n1['catchment'], n2['catchment']])
for child in n1.children:
if not watershed_workflow.utils.isClose(child.linestring.coords[-1], new_ls.coords[0]):
child.appendCoordinate(new_ls.coords[0])
for child in n2.children:
if not watershed_workflow.utils.isClose(child.linestring.coords[-1], new_ls.coords[0]):
child.appendCoordinate(new_ls.coords[0])
if names.DOWNSTREAM_HYDROSEQ in child:
child[names.DOWNSTREAM_HYDROSEQ] = n1[names.HYDROSEQ]
n1.addChild(child)
n2.children = []
n2.remove()
n2.linestring = shapely.geometry.LineString()
return n1
#
# Construction method
#
[docs]
def createRivers(reaches: gpd.GeoDataFrame,
method: Literal['geometry', 'hydroseq', 'native'] = 'geometry',
tol: float = _tol) -> List[River]:
"""Constructs River objects from a list of reaches.
Parameters
----------
reaches : gpd.GeoDataFrame
The reaches to turn into rivers.
method : str, optional
Provide the method for constructing rivers. Valid are:
- 'geometry' looks at coincident coordinates
- 'hydroseq' Valid only for NHDPlus data, this uses the
NHDPlus VAA tables Hydrologic Sequence. If using this
method, get_reaches() must have been called with both
'hydroseq' and 'dnhydroseq' properties requested (or properties=True).
- 'native' Reads a natively dumped list of rivers.
tol : float, optional
Defines what close is in the case of method == 'geometry'. Defaults to _tol.
"""
if method == 'hydroseq':
rivers = watershed_workflow.river_tree.River.constructRiversByHydroseq(reaches)
elif method == 'geometry':
rivers = watershed_workflow.river_tree.River.constructRiversByGeometry(reaches, tol)
elif method == 'native':
rivers = watershed_workflow.river_tree.River.constructRiversByDataFrame(reaches)
else:
raise ValueError(
"Invalid method for making Rivers, must be one of 'hydroseq' or 'geometry'")
# reset the data frame to be unique for each river
for river in rivers:
river.resetDataFrame()
return rivers
#
# Helper functions on lists of rivers
#
[docs]
def determineOutletToReachMap(rivers: List[River],
outlets: gpd.GeoDataFrame,
reach_ID_column: str = 'reach_ID',
measure_tol: float = 15) -> gpd.GeoDataFrame:
"""Given a list of rivers and a set of gages, find the reach in
rivers and mark where on the reach to put the effective gage.
Parameters
----------
rivers : list[River]
Rivers from which outlet reaches are potentially from
outlets : gpd.GeoDataFrame
GeoDataFrame containing at least the following columns
- reach_ID_column : ID of the reach on which the outlet lives
- measure : (if algorithm == 'measure tol') the % up the reach
from downstream of the true location of the outlet
reach_ID_column : str
Name of the column containing the reach ID.
Returns
-------
geopandas.GeoDataFrame
An updated outlets GeoDataFrame including additionally:
- river_index : index into rivers indicating which river the outlet is on
- reach_index : index of the reach holding the outlet
- location_on_reach : an indicator function, 0 if the outlet
should be approximated on the downstream end of the reach, 1
if it is on the upstream end.
- true_geometry : the old geometry
- geometry : the new location of the outlet
"""
assert reach_ID_column in outlets
# find the index in rivers and reaches
river_indices = []
reach_indices = []
locations_on_reach = []
new_geometry = []
for index in outlets.index:
reach_ID = outlets.loc[index, reach_ID_column]
measure = outlets.loc[index, 'measure']
for ri, river in enumerate(rivers):
node = river.findNode(lambda n: n[names.ID] == reach_ID)
if node is not None:
river_indices.append(ri)
reach_indices.append(node.index)
loc = 1
if node.parent is None:
# river outlet, put at downstream end
loc = 0
elif measure < measure_tol and len(list(node.siblings)) == 0:
# close to downstream end, no siblings, ok to put downstream
loc = 0
# upstream to downstream coordinates
if loc == 0:
new_geometry.append(shapely.geometry.Point(node.linestring.coords[-1]))
elif loc == 1:
new_geometry.append(shapely.geometry.Point(node.linestring.coords[0]))
locations_on_reach.append(loc)
break
outlets['river_index'] = river_indices
outlets['reach_index'] = reach_indices
outlets['location_on_reach'] = locations_on_reach
outlets['true_geometry'] = outlets['geometry']
outlets['geometry'] = new_geometry
outlets = outlets.set_geometry('geometry')
return outlets
[docs]
def accumulateCatchments(rivers: List[River],
outlets: gpd.GeoDataFrame,
reach_ID_column: str = 'reach_ID') -> gpd.GeoDataFrame:
"""Given a dataframe of outlets, compute contributing areas for each one.
Parameters
----------
rivers : list[River]
Rivers from which outlet reaches are potentially from
outlets : gpd.GeoDataFrame
GeoDataFrame containing at least the following columns
- river_index : index into rivers indicating which river the outlet is on
- reach_index : index of the reach holding the outlet
- location_on_reach : indicator (0,1) of where on the reach is
the outlet
Likely this is satisfied by calling determineOutletToReachMap()
reach_ID_column : str, optional
Name of the column containing the reach ID. Defaults to 'reach_ID'.
Returns
-------
geopandas.GeoDataFrame
An updated outlets GeoDataFrame including additionally:
- catchment : polygon geometry of the contributing area to the outlet
"""
assert 'river_index' in outlets
assert 'reach_index' in outlets
assert 'location_on_reach' in outlets
# find the index in rivers and reaches
catchments = []
for index in outlets.index:
river_index = outlets.loc[index, 'river_index']
reach_index = outlets.loc[index, 'reach_index']
node = rivers[river_index].getNode(reach_index)
assert node is not None
loc = outlets.loc[index, 'location_on_reach']
if loc == 0:
print('computing from downstream')
ca = shapely.unary_union(
[n[names.CATCHMENT] for n in node.preOrder() if n[names.CATCHMENT] is not None])
else:
print('computing from upstream')
ca = shapely.unary_union([n[names.CATCHMENT] for child in node.children \
for n in child.preOrder() if n[names.CATCHMENT] is not None])
catchments.append(ca)
outlets[names.CATCHMENT] = gpd.GeoSeries(catchments, outlets.index)
return outlets
[docs]
def accumulateIncrementalCatchments(rivers: List[River],
outlets: gpd.GeoDataFrame) -> gpd.GeoDataFrame:
"""Given a list of outlet_indices, form the incremental contributing areas.
Parameters
----------
rivers : list[River]
Rivers from which outlet reaches are potentially from
outlets : gpd.GeoDataFrame
GeoDataFrame containing at least the following columns
- river_index : index into rivers indicating which river the outlet is on
- reach_index : index of the reach holding the outlet
- location_on_reach : indicator (0,1) of where on the reach is
the outlet
Returns
-------
geopandas.GeoDataFrame
An updated outlets GeoDataFrame including additionally:
- incremental_catchment : polygon geometry of the contributing
area to the outlet
"""
# sort by number of total children, decreasing. This ensures that we can work down the tree
roots = [
rivers[outlets.loc[index, 'river_index']].getNode(outlets.loc[index, 'reach_index'])
for index in outlets.index
]
assert all(root is not None for root in roots)
outlets['num_iterated_children'] = [len(root) for root in roots]
outlets = outlets.sort_values(by='num_iterated_children', ascending=False)
outlets.pop('num_iterated_children')
# recompute roots in the new order
roots = [
rivers[outlets.loc[index, 'river_index']].getNode(outlets.loc[index, 'reach_index'])
for index in outlets.index
]
assert all(root is not None for root in roots)
stopping_ids = [root.index for root in roots]
def _truncated_tree_iter(n):
if n.index in stopping_ids:
print(f'stoppping at {n["comid"]}')
matches = outlets[outlets['reach_index'] == n.index]
assert len(matches) == 1
if matches['location_on_reach'].values[0] == 1:
print(f'...but including it {n["comid"]}')
yield n
else:
yield n
for c in n.children:
for nn in _truncated_tree_iter(c):
yield nn
def truncated_tree_iter(n):
print(f'calling trunc_tree on {n["comid"]}')
for i in _truncated_tree_iter(n):
yield i
incremental_cas = []
for root, index in zip(roots, outlets.index):
print()
print(f'Computing CA for {index}')
print('-' * 40)
loc = outlets.loc[index, 'location_on_reach']
catches = []
if loc == 0:
catches.append(root[names.CATCHMENT])
for child in root.children:
catches.extend([n[names.CATCHMENT] for n in truncated_tree_iter(child)])
incremental_cas.append(shapely.unary_union([ca for ca in catches if ca is not None]))
outlets['incremental_catchment'] = gpd.GeoSeries(incremental_cas, outlets.index)
return outlets
#
# Cleanup methods -- merge and prune
#
[docs]
def mergeShortReaches(river: River, tol: Optional[float]) -> None:
"""Remove inner branches that are short, combining branchpoints as needed.
This function merges the "short" linestring into the child
linestring if it is a junction tributary with one child or into
the parent linestring otherwise.
Parameters
----------
river : River
The river network to process.
tol : float, optional
Length threshold below which reaches will be merged. If None, the
tolerance is taken from the reach property TARGET_SEGMENT_LENGTH.
"""
# do-not-merge flag:
# 1 -> do not merge upstream of this reach
# -1 -> do not merge downstream of this reach
if 'do-not-merge' not in river.df.keys():
river.df['do-not-merge'] = [0, ] * len(river.df)
for node in list(river):
if tol is None:
ltol = node[names.TARGET_SEGMENT_LENGTH]
else:
ltol = tol
if node.linestring.length < ltol and node.parent is not None:
nname = node[names.ID] if names.ID in node.properties else node.index
logging.info(" ...cleaned inner linestring of length %g at centroid %r with id %r" %
(node.linestring.length, node.linestring.centroid.coords[0], nname))
if len(list(node.siblings)) > 0 and len(
node.children) == 1 and node['do-not-merge'] != 1:
# junction tributary with one child
node.children[0].merge()
elif len(node.children) == 0 and node['do-not-merge'] != -1:
# if the leaf node is too small
node.remove()
node.linestring = shapely.geometry.LineString()
elif node['do-not-merge'] != -1:
for sibling in list(node.siblings):
sibling.moveCoordinate(-1, node.linestring.coords[0])
sibling.remove()
node.addChild(sibling)
assert (len(list(node.siblings)) == 0)
node.merge()
[docs]
def pruneByLineStringLength(river: River, prune_tol: Optional[float] = None) -> int:
"""Removes any leaf linestrings that are shorter than prune_tol.
Parameters
----------
river : River
The river network to prune.
prune_tol : float, optional
Length threshold below which leaf reaches will be removed.
If None, uses the TARGET_SEGMENT_LENGTH property from each leaf.
Returns
-------
int
Number of reaches that were pruned.
"""
count = 0
iter_count = 1
while iter_count > 0:
iter_count = 0
for leaf in river.leaf_nodes:
if prune_tol is None:
lprune_tol = leaf[names.TARGET_SEGMENT_LENGTH]
else:
lprune_tol = prune_tol
if leaf.linestring.length < lprune_tol:
count += 1
logging.info(" ...cleaned leaf linestring of length: %g at centroid %r" %
(leaf.linestring.length, leaf.linestring.centroid.coords[0]))
leaf.prune()
count += iter_count
return count
[docs]
def pruneByArea(river: River, area: float, prop: str = names.DRAINAGE_AREA) -> int:
"""Removes, IN PLACE, reaches whose total contributing area is less than area km^2.
Parameters
----------
river : River
The river network to prune.
area : float
Area threshold in km^2. Reaches with contributing area below this
value will be removed.
prop : str, optional
Name of the property containing drainage area values.
Defaults to DRAINAGE_AREA from standard_names.
Returns
-------
int
Number of reaches that were pruned.
Note
----
This requires NHDPlus data to have been used and the drainage area
property to have been set.
"""
count = 0
for node in river.preOrder():
# note, we only ever prune children, to avoid unneeded recursive pruning
#
# make a copy of the children, as this list will be modified by potential prune calls
children = node.children[:]
for child in children:
if child[prop] < area:
logging.debug(f"... removing trib with {len(child)}"
f" reaches of area: {child[prop]}")
count += len(child)
child.prune()
return count
[docs]
def pruneRiversByArea(rivers: List[River],
area: float,
prop: str = names.DRAINAGE_AREA) -> List[River]:
"""Both prunes reaches and filters rivers whose contributing area is less than area."""
num_reaches = sum(len(river) for river in rivers)
count = 0
sufficiently_big_rivers = []
for river in rivers:
if river[prop] >= area:
count += pruneByArea(river, area, prop)
sufficiently_big_rivers.append(river)
else:
count += len(river)
logging.info(f"... pruned {count} of {num_reaches}")
return sufficiently_big_rivers
[docs]
def filterDiversions(rivers: List[River]) -> List[River]:
"""Filteres diversions, but not braids."""
logging.info("Remove diversions...")
non_diversions = []
for river in rivers:
keep_river = True
count_tribs = 0
count_reaches = 0
for leaf in river.leaf_nodes:
if leaf[names.DIVERGENCE] == 2:
# is a braid or a diversion
if river.getNode(leaf[names.UPSTREAM_HYDROSEQ]) is None:
# diversion!
try:
joiner = next(n for n in leaf.pathToRoot()
if n.parent is not None and len(n.parent.children) > 1)
except StopIteration:
# no joiner means kill the whole river
logging.info(f' ... remove diversion river with {len(river)} reaches.')
keep_river = False
break
else:
count_tribs += 1
count_reaches += len(joiner)
joiner.prune()
if keep_river:
logging.info(
f' ... removed {count_tribs} diversion tributaries with {count_reaches} total reaches.'
)
non_diversions.append(river)
return non_diversions
[docs]
def removeBraids(rivers: List[River]) -> None:
"""Remove braids, but not diversions."""
logging.debug("Removing braided sections...")
for river in rivers:
count_tribs = 0
count_reaches = 0
for leaf in river.leaf_nodes:
if leaf[names.DIVERGENCE] == 2:
# is a braid or a diversion?
logging.info(f" Found a braid with upstream = {leaf[names.UPSTREAM_HYDROSEQ]}")
upstream_hydroseq = leaf[names.UPSTREAM_HYDROSEQ]
if river.findNode(lambda n: n[names.HYDROSEQ] == upstream_hydroseq) is not None:
# braid!
try:
joiner = next(n for n in leaf.pathToRoot()
if n.parent is not None and len(n.parent.children) > 1)
except StopIteration:
assert (False)
# this should not be possible, because our braid must come back somewhere
else:
count_tribs += 1
count_reaches += len(joiner)
joiner.prune()
logging.debug(f'... removed {count_tribs} braids with {count_reaches} reaches'
f' from a river of length {len(river)}')
[docs]
def filterDivergences(rivers: List[River]) -> List[River]:
"""Removes both diversions and braids.
Braids are divergences that return to the river network, and so
look like branches of a river tree whose upstream entity is in the
river (in another branch).
Diversions are divergences that do not return to the stream
network, and so their upstream entity is in another river.
"""
logging.info("Removing divergent sections...")
non_divergences = []
for river in rivers:
keep_river = True
count_tribs = 0
count_reaches = 0
for leaf in river.leaf_nodes:
if leaf[names.DIVERGENCE] == 2:
# diversion!
try:
joiner = next(n for n in leaf.pathToRoot()
if n.parent is not None and len(n.parent.children) > 1)
except StopIteration:
# no joiner means kill the whole river
logging.info(f' ... remove divergence river with {len(river)} reaches.')
keep_river = False
break
else:
count_tribs += 1
count_reaches += len(joiner)
joiner.prune()
if keep_river:
logging.info(
f' ... removed {count_tribs} divergence tributaries with {count_reaches} total reaches.'
)
non_divergences.append(river)
return non_divergences
[docs]
def filterSmallRivers(rivers: List[River], count: int) -> List[River]:
"""Remove any rivers with fewer than count reaches."""
logging.info(f"Removing rivers with fewer than {count} reaches.")
new_rivers = []
for river in rivers:
ltree = len(river)
if ltree < count:
logging.debug(f" ...removing river with {ltree} reaches")
else:
new_rivers.append(river)
logging.debug(f" ...keeping river with {ltree} reaches")
logging.info(f'... removed {len(rivers) - len(new_rivers)} rivers')
return new_rivers
[docs]
def simplify(rivers: List[River], tol: float) -> None:
"""Simplify, IN PLACE, all reaches."""
if len(rivers) == 0:
return
rivers[0].df.simplify(tol)
for river in rivers[1:]:
if river.df is not rivers[0].df:
river.df.simplify(tol)
[docs]
def isClose(river1: River, river2: River, tol: float) -> bool:
"""Equivalence of rivers.
Parameters
----------
river1 : River
First river to compare.
river2 : River
Second river to compare.
tol : float
Tolerance for geometric comparison.
Returns
-------
bool
True if the rivers are equivalent within the given tolerance.
"""
return all((watershed_workflow.utils.isClose(r1.linestring, r2.linestring, tol)
and len(r1.children) == len(r2.children))
for (r1, r2) in zip(river1.preOrder(), river2.preOrder()))