Source code for watershed_workflow.regions

"""
This namespace hosts functions to add labelsets to define regions
"""
from typing import Optional, List, Dict, Tuple
import logging
import warnings

import numpy as np
import shapely

from watershed_workflow.split_hucs import SplitHUCs
from watershed_workflow.river_tree import River
from watershed_workflow.mesh import Mesh2D

import watershed_workflow.sources.standard_names as names
import watershed_workflow.mesh
import watershed_workflow.utils



[docs] def addSurfaceRegions(m2 : Mesh2D, column : str = 'land_cover', names : Optional[dict[int, str]] = None): """Add labeled sets to a mesh -- one per unique color. Parameters ---------- m2 : mesh.Mesh2D The mesh to label. names : dict, optional Dictionary mapping colors to color name. """ assert m2.cell_data is not None colors = m2.cell_data[column].astype(int) inds = np.unique(colors) if names is None: names = dict((i, str(i)) for i in inds) for ind in inds: ent_ids = list(np.where(colors == ind)[0]) ls = watershed_workflow.mesh.LabeledSet(names[ind], int(ind), 'CELL', ent_ids) m2.labeled_sets.append(ls)
[docs] def addPolygonalRegions(m2 : Mesh2D, polygons : SplitHUCs | List[Tuple[shapely.Polygon, str]], volume : bool = False) -> List[List[int]]: """Label m2 with region(s) for each polygon. Always adds a surface region; if volume also adds a volume region for extrusion in 3D. Parameters ---------- m2 : mesh.Mesh2D The mesh to label. polygons : SplitHUCs | List[shapely.Polygon] The polygons covering each watershed. volume : bool, optional If true, also add the volumetric region below the polygon that will be extruded in a 3D mesh eventually. Returns ------- partitions : List[List[int]] A list of length polygons, each entry of which is the list of cell indices in that polygon. """ if isinstance(polygons, SplitHUCs): polygons = list(zip(polygons.df['geometry'], polygons.df[names.NAME])) logging.info(f"Adding regions for {len(polygons)} polygons") partitions : List[List[int]] = [list() for p in polygons] for c in range(m2.num_cells): cc = m2.centroids[c] cc = shapely.geometry.Point(cc[0], cc[1]) done = False for i, (p,l) in enumerate(polygons): if p.contains(cc): done = True partitions[i].append(c) break for (p,label), part in zip(polygons, partitions): if len(part) > 0: # add a region, denoting this one as "to extrude". This # will become the volume region setid = m2.getNextAvailableLabeledSetID() ls = watershed_workflow.mesh.LabeledSet(label, int(setid), 'CELL', part) m2.labeled_sets.append(ls) ls.to_extrude = True # add a second region, denoting this one as the top surface of faces setid2 = m2.getNextAvailableLabeledSetID() ls2 = watershed_workflow.mesh.LabeledSet(label + ' surface', setid2, 'CELL', part) m2.labeled_sets.append(ls2) return partitions
[docs] def addWatershedAndOutletRegions(m2 : Mesh2D, hucs : SplitHUCs, outlet_width : float = 300, exterior_outlet=True): """Add four labeled sets to m2 for each polygon: - cells in the polygon, to be extruded - cells in the polygon, to be kept as faces upon extrusion - boundary of the polygon (edges, kept as faces) - outlet of the polygon (edges, kept as faces, within outlet width of the outlet) Parameters ---------- m2 : mesh.Mesh2D The mesh to label. hucs : iterable[shapely.Polygon] or SplitHUCs Watershed polygons. outlet_width : float, optional How wide should the outlet region be? Note this should include not just the river width, but more realistically the best resolved floodplain, or 1-2 face-widths, whichever is bigger. exterior_outlet : bool, optional If true, find the outlet point that intersects the boundary and include regions around that outlet as well. """ # this adds sets for all m2 cells in each polygon of hucs partitions = addPolygonalRegions(m2, hucs, volume=True) # add a set for the boundary of each polygon and the outlet # def isInsideBall(outlet, edge): """Helper function -- is edge centroid within outlet_width of outlet?""" n1 = m2.coords[edge[0]] n2 = m2.coords[edge[1]] c = (n1+n2) / 2. close = watershed_workflow.utils.isClose(outlet, tuple(c[0:2]), outlet_width) return close # find a list of faces on the boundary of the partitioned sets created above for label, partition, outlet in zip(hucs.df[names.NAME], partitions, hucs.df[names.OUTLET]): # create a subdomain mesh for the partition. This is not a # valid mesh, but it is sufficient to get the boundary edges subdomain_conn = [list(m2.conn[cell]) for cell in partition] subdomain_nodes = set([c for e in subdomain_conn for c in e]) subdomain_coords = np.array([m2.coords[c] for c in subdomain_nodes]) m2h = Mesh2D(subdomain_coords, subdomain_conn, check_handedness=False, validate=False) partition_boundary_edges = [(int(e[0]), int(e[1])) for e in m2h.boundary_edges] # create the partition boundary region of faces ls = watershed_workflow.mesh.LabeledSet(label + ' boundary', m2.getNextAvailableLabeledSetID(), 'FACE', partition_boundary_edges) ls.to_extrude = True m2.labeled_sets.append(ls) # every polygon now has an outlet -- find the boundary faces near that outlet if outlet is not None: outlet_faces = [e for e in m2h.boundary_edges if isInsideBall(outlet, e)] edges = [(int(e[0]), int(e[1])) for e in outlet_faces] if (len(edges) == 0): warnings.warn(f"Outlet region found 0 faces for polygon {label} near outlet " f"at {outlet}. Probably bad outlet data?") ls = watershed_workflow.mesh.LabeledSet(label + ' outlet', m2.getNextAvailableLabeledSetID(), 'FACE', edges) ls.to_extrude = True m2.labeled_sets.append(ls) # also write one for the full domain if exterior_outlet: if hasattr(hucs, "exterior_outlet"): exterior_outlet_point = hucs.exterior_outlet else: try: boundary = hucs.exterior() except AttributeError: boundary = shapely.ops.unary_union(hucs) exterior_outlet_point = next(outlet for outlet in hucs.df[names.OUTLET] if outlet.buffer(500).intersects(boundary)) outlet_faces = [e for e in m2.boundary_edges if isInsideBall(exterior_outlet_point, e)] edges = [(int(e[0]), int(e[1])) for e in outlet_faces] ls2 = watershed_workflow.mesh.LabeledSet('surface domain outlet', m2.getNextAvailableLabeledSetID(), 'FACE', edges) ls2.to_extrude = True m2.labeled_sets.append(ls2)
[docs] def addRiverCorridorRegions(m2 : Mesh2D, rivers : List[River], labels : Optional[List[str]] = None): """Add labeled sets to m2 for each river corridor. Parameters: ----------- m2: Mesh2D 2D mesh elevated on DEMs rivers: list(watershed_workflow.river_tree.RiverTree) List of rivers used to create the river corridors. labels: list(str), optional List of names, one per river. """ if labels is None: labels = [f'river corridor {i}' for (i,r) in enumerate(rivers)] else: assert (len(labels) == len(rivers)) for label, river in zip(labels, rivers): river_elements = [i for r in river.preOrder() for i in range(r[names.ELEMS_GID_START], r[names.ELEMS_GID_START] + len(r[names.ELEMS]))] if len(river_elements) > 0: setid2 = m2.getNextAvailableLabeledSetID() ls2 = watershed_workflow.mesh.LabeledSet(label + ' surface', setid2, 'CELL', river_elements) m2.labeled_sets.append(ls2)
[docs] def addStreamOrderRegions(m2 : Mesh2D, rivers : List[River]) -> None: """Add labeled sets to m2 for reaches of each stream order . Parameters: ----------- m2: Mesh2D 2D mesh elevated on DEMs rivers : List[watershed_workflow.river_tree.River] River used to create the river corridors. """ from collections import defaultdict regions : Dict[int, List[int]] = defaultdict(list) for river in rivers: for reach in river.preOrder(): order = reach[names.ORDER] regions[order].extend(range(reach[names.ELEMS_GID_START], reach[names.ELEMS_GID_START] + len(reach[names.ELEMS]))) labels = [f'stream order {order}' for order in regions.keys()] partitions = [regions[order] for order in regions.keys()] for label, part in zip(labels, partitions): if len(part) > 0: setid2 = m2.getNextAvailableLabeledSetID() ls2 = watershed_workflow.mesh.LabeledSet(label, setid2, 'CELL', part) m2.labeled_sets.append(ls2)
[docs] def addReachIDRegions(m2 : Mesh2D, river : River): """Add labeled sets to m2 for reaches of each stream order . Parameters: ----------- m2: Mesh2D 2D mesh elevated on DEMs river: watershed_workflow.river_tree.RiverTree List of rivers used to create the river corridors. reaches: list(str) list of NHDID IDs to be labeled. """ from collections import defaultdict regions : Dict[str, List[int]] = defaultdict(list) for reach in river.preOrder(): regions[f'reach {reach[names.ID]}'] = list(range(reach[names.ELEMS_GID_START], reach[names.ELEMS_GID_START] + len(reach[names.ELEMS]))) for label, part in regions.items(): if len(part) > 0: setid2 = m2.getNextAvailableLabeledSetID() ls2 = watershed_workflow.mesh.LabeledSet(label + ' surface', setid2, 'CELL', part) m2.labeled_sets.append(ls2)