Source code for watershed_workflow

"""These high-level functions attempt to "do what the user means."
They group other, lower-level functions into common sets of
operations.  The "default workflow" is nearly entirely built from
calls to this namespace.  If the user needs to deviate from the
default workflow, they can then call the lower level functions.

This top level module provides functionality for getting shapes and
rasters representing watershed boundaries, river networks, digital
elevation models, and other GIS datasets and then processing those
data sets for use in simulations.

Functions here that get shapes and rasters should be used instead of
directly using the source managers, because they additionally do
things like convert coordinate systems, get the data into common
data structures, etc.

"""
from __future__ import annotations

from . import _version
__version__ = _version.get_versions()['version']

from typing import Any, Optional, Iterable, List, Tuple, Union
import logging
import math
import numpy as np
import geopandas as gpd
import shapely.geometry
import xarray as xr
from matplotlib import pyplot as plt
import folium
import folium.plugins

import watershed_workflow.crs
import watershed_workflow.utils
import watershed_workflow.sources
import watershed_workflow.sources.standard_names as names

import watershed_workflow.river_tree
from watershed_workflow.river_tree import River
import watershed_workflow.split_hucs
from watershed_workflow.split_hucs import SplitHUCs

import watershed_workflow.hydrography
import watershed_workflow.resampling
import watershed_workflow.angles
import watershed_workflow.triangulation
import watershed_workflow.river_mesh
import watershed_workflow.condition
import watershed_workflow.data


#
# functions for relating objects
# -----------------------------------------------------------------------------
[docs] def findHUC(source : Any, shape : shapely.geometry.Polygon, in_crs : watershed_workflow.crs.CRS, hint : str, shrink_factor : float = 1.e-5) -> str: """Finds the smallest HUC containing shape. Parameters ---------- source : source-type Source object providing a getShapes() method that gets HUCs by ID. shape : shapely.geometry.Polygon Find this shape in a HUC. in_crs : watershed_workflow.crs.CRS CRS of shape. hint : str HUC in which to start searching. This should be at least as long as the indexing file size -- HUC 2 or longer for WBD, 4 or longer for NHD Plus, or 8 or longer for NHD. shrink_factor : float, optional A fraction of the radius of shape to shrink prior for checking containment within HUCs. This fixes cases where shape is on a HUC boundary with potentially some numerical error. Returns ------- out : str The smallest containing HUC. """ def _in_huc(shply, huc_shply): """Checks whether shply is in huc_shply""" if huc_shply.contains(shply): return 2 elif huc_shply.intersects(shply): return 1 else: return 0 def _findHUC(source, shply, crs, hint): """Searches in hint to find shp.""" logging.info('searching: %s' % hint) hint_level = len(hint) search_level = hint_level + 2 if search_level > source.lowest_level: return hint source.setLevel(search_level) subhus = source.getShapesByID(hint, out_crs=crs) for ID, subhu in zip(subhus[names.ID], subhus.geometry): inhuc = _in_huc(shply, subhu) if inhuc == 2: # fully contained in try_huc, recurse logging.info(f' subhuc: {ID} contains') return _findHUC(source, shply, crs, ID) elif inhuc == 1: logging.info(f' subhuc: {ID} partially contains') # partially contained in try_huc, return this return hint else: logging.info(f' subhuc: {ID} does not contain') assert False # must shrink the poly a bit in case it is close to or on a boundary radius = math.sqrt(shape.area / math.pi) shape_s = shape.buffer(-shrink_factor * radius) hint_hu = source.getShapesByID(hint, out_crs=in_crs) inhuc = _in_huc(shape_s, hint_hu.geometry.iloc[0]) if inhuc != 2: raise RuntimeError(f"{source.__class__}: shape not found in hinted HUC '{hint}'") result = _findHUC(source, shape_s, in_crs, hint) return result
[docs] def reduceRivers(rivers : List[River], ignore_small_rivers : int = 0, keep_n_rivers : int = -1, prune_by_area : float = 0.0, area_property : str = names.DRAINAGE_AREA, remove_diversions : bool = False, remove_braided_divergences : bool = False, tol : Optional[float] = 0.1) -> List[River]: """Reduce the extent of the river network through a variety of methods. Parameters ---------- rivers : list(river_tree.River) A list of rivers to reduce. ignore_small_rivers : int, optional If provided and positive, removes rivers whose number of reaches is less than this value. If negative, keeps the N biggest (in number of reaches) rivers, where N is the negative of the provided value (e.g. -2 keeps the biggest 2 rivers). prune_by_area : float, optional If provided, remove reaches whose total contributing area is less than this tol. NOTE: only valid for reaches that include a contributing area property (e.g. NHDPlus). area_property : str, optional='DivergenceRoutedDrainAreaSqKm' Name of the area property to use for determining reach CA. Note that this defines the units of prune_by_area value. remove_diversions : bool, optional=False If true, remove diversions (see documentation of modify_rivers_remove_divergences()). remove_braided_divergences : bool, optional=False If true, remove braided divergences (see documentation of modify_rivers_remove_divergences()). tol : float, optional=0.1 Defines what close is in the case of method == 'geometry' Returns ------- out : list(river_tree.River) A list of rivers, as River objects. """ if keep_n_rivers > 0: logging.info(f"Removing all but the biggest {keep_n_rivers} rivers") rivers = sorted(rivers, key=lambda a: len(a), reverse=True) rivers = rivers[0:keep_n_rivers] if ignore_small_rivers > 0: rivers = watershed_workflow.river_tree.filterSmallRivers(rivers, ignore_small_rivers) if prune_by_area > 0.0: logging.info(f"Removing rivers with area < {prune_by_area}") rivers = [r for r in rivers if r.properties[area_property] > prune_by_area] if remove_diversions and remove_braided_divergences: rivers = watershed_workflow.river_tree.filterDivergences(rivers) elif remove_diversions: rivers = watershed_workflow.river_tree.filterDiversions(rivers) elif remove_braided_divergences: watershed_workflow.river_tree.removeBraids(rivers) if prune_by_area is not None: rivers = watershed_workflow.river_tree.pruneRiversByArea(rivers, prune_by_area, area_property) if ignore_small_rivers > 0: rivers = watershed_workflow.river_tree.filterSmallRivers(rivers, ignore_small_rivers) return rivers
[docs] def simplify(hucs : SplitHUCs, rivers : List[River], reach_segment_target_length : float, huc_segment_target_length : Optional[float] = None, river_close_distance : float = 100.0, river_far_distance : float = 500.0, resample_by_reach_property : bool = False, min_angle : float = 20, junction_min_angle : float = 20, snap_triple_junctions_tol : Optional[float] = None, plot_diagnostics : bool = False, keep_points : bool = False) -> None: """Simplifies, in place, the HUC and river shapes to create constrained, discrete segments. Parameters ---------- hucs : SplitHUCs A split-form HUC object containing all reaches. rivers : list(River) A list of river objects. huc_segment_target_length : float Target length of a typical triangle edge away from the river. reach_segment_target_length : float Target length of a typical triangle edge at the river. waterbodies : list(shply), optional A list of waterbodies. simplify_hucs : float, optional If provided, simply the hucs by moving points at most this many units (see also shapely.simplify). Units are that of the CRS of shapes. simplify_rivers : float, optional If provided, simply the rivers by moving points at most this many units (see also shapely.simplify). Units are that of the CRS of shapes. If not provided, use the simplify_hucs value. Provide 0 to make no changes to the rivers. simplify_waterbodies : float, optional Simplify the waterbodies. If not provided, uses the simplify_hucs value. Provide 0 to make no changes to the waterbodies. prune_tol : float, optional = simplify_rivers Prune leaf reaches that are smaller than this tolerance. If not provided, uses simplify_rivers value. Provide 0 to not do this step. merge_tol : float, optional = simplify_rivers Merges reaches that are smaller than this tolerance with their downstream parent reach. Note that if there is a branchpoint at the downstream node of the small reach, it will get moved to the upstream node. If not provided, uses simplify_rivers value. Provide 0 to not do this step. snap_tol : float, optional = 0.75 * simplify_rivers Tolerance used for snapping rivers to nearby huc boundaries. Provide 0 to not snap. snap_triple_junctions_tol : float, optional = 3 * snap_tol Tolerance used for snapping river triple junctions to huc triple junctions. snap_reach_endpoints_tol : float, optional = 2 * snap_tol Tolerance used for snapping reach junctions to huc boundaries. snap_waterbodies_tol : float, optional = snap_tol If not 0, snaps waterbody and HUC nodes that are nearly coincident to be discretely coincident. Note this is not recommended; prefer to include major waterbodies in the HUC network. cut_intersections : bool, optional = False If true, force intersections of the river network and the HUC boundary to occur at a coincident node by adding nodes as needed. """ logging.info("") logging.info("Simplifying") logging.info("-" * 30) assert len(rivers) > 0 logging.info(rivers[0].df.crs) logging.info(f"Presimplify to remove colinear, coincident points.") presimplify = 1.e-4 * reach_segment_target_length watershed_workflow.river_tree.simplify(rivers, presimplify) watershed_workflow.split_hucs.simplify(hucs, presimplify) logging.info(rivers[0].df.crs) logging.info(f"Pruning leaf reaches < {reach_segment_target_length}") for river in rivers: watershed_workflow.river_tree.pruneByLineStringLength(river, reach_segment_target_length) logging.info(rivers[0].df.crs) logging.info(f"Merging internal reaches < {reach_segment_target_length}") for river in rivers: watershed_workflow.river_tree.mergeShortReaches(river, 0.75*reach_segment_target_length) logging.info(rivers[0].df.crs) watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info("Snapping discrete points to make rivers and HUCs discretely consistent.") logging.info(" -- snapping HUC triple junctions to reaches") if snap_triple_junctions_tol is None: snap_triple_junctions_tol = 3*reach_segment_target_length watershed_workflow.hydrography.snapHUCsJunctions(hucs, rivers, snap_triple_junctions_tol) watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info(rivers[0].df.crs) logging.info(" -- snapping reach endpoints to HUC boundaries") for river in rivers: watershed_workflow.hydrography.snapReachEndpoints(hucs, river, reach_segment_target_length) assert river.isContinuous() watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info(rivers[0].df.crs) logging.info(" -- cutting reaches at HUC boundaries") watershed_workflow.hydrography.cutAndSnapCrossings(hucs, rivers, reach_segment_target_length) for river in rivers: assert river.isContinuous() watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info(rivers[0].df.crs) logging.info("") logging.info("Simplification Diagnostics") logging.info("-" * 30) watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info(rivers[0].df.crs) # resample logging.info("") logging.info("Resampling HUC and river") logging.info("-" * 30) if huc_segment_target_length is not None: dfunc = (river_close_distance, reach_segment_target_length, river_far_distance, huc_segment_target_length) river_mls = shapely.ops.unary_union([river.to_mls() for river in rivers]) logging.info(f" -- resampling HUCs based on distance function {dfunc}") watershed_workflow.resampling.resampleSplitHUCs(hucs, dfunc, river_mls, keep_points=keep_points) else: logging.info(f" -- resampling HUCs based on uniform target {reach_segment_target_length}") watershed_workflow.resampling.resampleSplitHUCs(hucs, reach_segment_target_length, keep_points=keep_points) logging.info(rivers[0].df.crs) if resample_by_reach_property: logging.info(f" -- resampling reaches based on TARGET_SEGMENT_LENGTH property") watershed_workflow.resampling.resampleRivers(rivers, keep_points=keep_points) else: logging.info(f" -- resampling reaches based on uniform target {reach_segment_target_length}") watershed_workflow.resampling.resampleRivers(rivers, reach_segment_target_length, keep_points=keep_points) logging.info(rivers[0].df.crs) logging.info("") logging.info("Resampling Diagnostics") logging.info("-" * 30) if plot_diagnostics: fig, ax = plt.subplots(1,1) watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach", ax=ax, color='b') watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ", ax=ax, color='orange') else: watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info(rivers[0].df.crs) # fix bad angles logging.info("") logging.info("Clean up sharp angles, both internally and at junctions.") logging.info("-" * 30) count = watershed_workflow.angles.smoothSharpAngles(hucs, rivers, min_angle, junction_min_angle) logging.info(f"Cleaned up {count} sharp angles.") watershed_workflow.utils.logMinMaxMedianSegment((r.linestring for river in rivers for r in river), "reach") watershed_workflow.utils.logMinMaxMedianSegment(hucs.linestrings, "HUC ") logging.info(rivers[0].df.crs)
[docs] def triangulate(hucs : SplitHUCs, rivers : Optional[List[River]] = None, internal_boundaries : Optional[List[shapely.geometry.BaseGeometry | River]] = None, hole_points : Optional[List[shapely.geometry.Point]] = None, diagnostics : bool = False, verbosity : int = 1, tol : float = 1.0, refine_max_area : Optional[float] = None, refine_distance : Optional[Tuple[float,float,float,float]] = None, refine_polygons : Optional[Tuple[List[shapely.geometry.Polygon], List[float]]] = None, refine_max_edge_length : Optional[float] = None, refine_min_angle : Optional[float] = None, enforce_delaunay : bool = False, as_mesh : bool = True) -> \ Tuple[np.ndarray, np.ndarray] | \ watershed_workflow.mesh.Mesh2D | \ Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray] | \ Tuple[watershed_workflow.mesh.Mesh2D, np.ndarray, np.ndarray]: """Triangulates HUCs and rivers. Note, refinement of a given triangle is done if any of the provided criteria is met. Parameters ---------- hucs : SplitHUCs A split-form HUC object from, e.g., get_split_form_hucs() rivers : list[watershed_workflow.river_tree.River], optional List of rivers, used to refine the triangulation in conjunction with refine_distance. internal_boundaries : list[shapely.geometry.Polygon, watershed_workflow.river_tree.River], optional List of objects, whose boundary (in the case of polygons/waterbodies) or reaches (in the case of River) will be present in the edges of the triangulation. hole_points : list(shapely.Point), optional List of points inside the polygons to be left as holes/voids (excluded from mesh) diagnostics : bool, optional Plot diagnostics graphs of the triangle refinement. tol : float, optional Set tolerance for minimum distance between two nodes. The unit is the same as that of the watershed's CRS. The default is 1. refine_max_area : float, optional Refine a triangle if its area is greater than this area. refine_polygons : [list(shapely.geometry.Polygon), list(float)], optional Refine a triangle if it falls within the polygons and its area is greater than the area limit for the polygon refine_distance : list(float), optional Refine a triangle if its area is greater than a function of its centroid's distance from the nearest point on the river network. The argument is given by: [near_distance, near_area, far_distance, far_area] Defining d as the distance from triangle centroid to the nearest point on the river network and area as the area of the triangle in question, refinement occurs if: * d < near_distance and area > near_area * d > far_distance and area > far_area * otherwise, defining d' = (d - near_distance) / (far_distance - near_distance), refining occurs if area > near_area + (far_area - near_area) * d' Effectively this simply writes a piecewise linear function of triangle distance from centroid and uses that as a max area criteria. refine_max_edge_length : float, optional Refine a triangle if its max edge length is greater than this length. refine_min_angle : float, optional Try to ensure that all triangles have a minimum edge length greater than this value. enforce_delaunay : bool,optional, experimental Attempt to ensure all triangles are proper Delaunay triangles. .. note: This requires a hacked version of meshpy.triangle that supports this option. See the patch available at workflow_tpls/meshpy_triangle.patch Returns ------- vertices : np.array((n_points, 2), 'd') Array of triangle vertices. triangles : np.array((n_tris, 3), 'i') For each triangle, a list of 3 indices into the vertex array that make up that triangle. areas : _only if diagnostics=True_, np.array((n_tris), 'd') Array of triangle areas. distances : _only if diagnostics=True_, np.array((n_tris), 'd') Array of triangle distances from the river network. """ verbose = verbosity > 2 logging.info("") logging.info("Triangulation") logging.info("-" * 30) refine_funcs = [] if refine_max_area != None: refine_funcs.append(watershed_workflow.triangulation.refineByMaxArea(refine_max_area)) if refine_distance != None: refine_funcs.append( watershed_workflow.triangulation.refineByRiverDistance(*refine_distance, rivers)) if refine_max_edge_length != None: refine_funcs.append( watershed_workflow.triangulation.refineByMaxEdgeLength(refine_max_edge_length)) if refine_polygons != None: refine_funcs.append(watershed_workflow.triangulation.refineByPolygons(refine_polygons[0], refine_polygons[1])) def my_refine_func(*args): return any(rf(*args) for rf in refine_funcs) vertices, triangles = watershed_workflow.triangulation.triangulate( hucs, internal_boundaries=internal_boundaries, hole_points=hole_points, tol=tol, verbose=verbose, refinement_func=my_refine_func, min_angle=refine_min_angle, enforce_delaunay=enforce_delaunay, allow_boundary_steiner=False) if diagnostics: logging.info("Plotting triangulation diagnostics") if rivers is not None: river_multiline = shapely.ops.unary_union([river.to_mls() for river in rivers]) distances_l = [] areas_l = [] needs_refine = [] for tri in triangles: verts = vertices[tri] bary = np.sum(np.array(verts), axis=0) / 3 bary_p = shapely.geometry.Point(bary[0], bary[1]) distances_l.append(bary_p.distance(river_multiline)) areas_l.append(watershed_workflow.utils.computeTriangleArea(*verts)) needs_refine.append(my_refine_func(verts, areas_l[-1])) distances = np.array(distances_l) areas = np.array(areas_l) logging.info(" min area = {}".format(areas.min())) logging.info(" max area = {}".format(areas.max())) if verbosity > 0: plt.figure() plt.subplot(121) plt.hist(distances) plt.xlabel("distance from river of triangle centroids [m]") plt.ylabel("count [-]") plt.subplot(122) plt.scatter(distances, areas, c=needs_refine, marker='x') plt.xlabel("distance [m]") plt.ylabel("triangle area [m^2]") else: areas = np.array([watershed_workflow.utils.computeTriangleArea(*vertices[tri]) for tri in triangles]) logging.info(" min area = {}".format(areas.min())) logging.info(" max area = {}".format(areas.max())) if verbosity > 0: plt.figure() plt.subplot(111) plt.hist(areas) plt.xlabel("area [m]") plt.ylabel("count [-]") if as_mesh: m2 = watershed_workflow.mesh.Mesh2D(vertices, triangles, crs=hucs.crs) return m2, areas, distances else: return vertices, triangles, areas, distances if as_mesh: return watershed_workflow.mesh.Mesh2D(vertices, triangles, crs=hucs.crs) else: return vertices, triangles
[docs] def tessalateRiverAligned(hucs : SplitHUCs, rivers : List[River], river_width : Any, internal_boundaries : Optional[List[River | shapely.geometry.base.BaseGeometry]] = None, as_mesh : bool = True, debug : bool = False, **kwargs) -> \ Tuple[np.ndarray, List[List[int]]] | \ watershed_workflow.mesh.Mesh2D | \ Tuple[np.ndarray, List[List[int]], gpd.GeoDataFrame] | \ Tuple[np.ndarray, List[List[int]], np.ndarray, np.ndarray] | \ Tuple[watershed_workflow.mesh.Mesh2D, np.ndarray, np.ndarray]: """Tessalate HUCs using river-aligned quads along the corridor and triangles away from it. Parameters ---------- hucs : SplitHUCs The huc geometry to tessalate. Note this will be adjusted if required by the river corridor. rivers : list[River] The rivers to mesh with quads river_width : float or dict or callable or boolean Width of the quads, either a float or a dictionary providing a {StreamOrder : width} mapping. Or a function (callable) that computer width using node properties Or boolean, where True means, width for each reach is explicitely provided properties as "width" river_n_quads : int, optional Number of quads across the river. Currently only 1 is supported (the default). hole_points : list(shapely.Point), optional List of points inside the polygons to be left as holes/voids (excluded from mesh) internal_boundaries : list[shapely.Polygon], optional List of internal boundaries to embed in the domain, e.g. waterbodies. diagnostics : bool, optional If true, prints extra diagnostic info. ax : matplotlib Axes object, optional For debugging -- plots troublesome reaches as quad elements are generated to find tricky areas. kwargs : All other arguments are passed to the triangulation function for refinement. Returns ------- vertices : np.array((n_vertices, 2), 'd') Array of triangle vertices. elems : list[list[int]] For each element, an ordered list of indices into the vertices array that make up that cell. areas : _only if diagnostics=True_, np.array((n_cell_vertices), 'd') Array of areas. """ logging.info("") logging.info("Stream-aligned Meshing") logging.info("-" * 30) # generate the quads logging.info('Creating stream-aligned mesh...') computeWidth = watershed_workflow.river_mesh.createWidthFunction(river_width) if debug: fig, ax = plt.subplots(1,1) else: ax = None river_coords, river_elems, river_corridors, hole_points, intersections = \ watershed_workflow.river_mesh.createRiversMesh(hucs, rivers, computeWidth, ax=ax) if debug: plt.show() if intersections is not None: return river_coords, river_elems, intersections # triangulate the rest if internal_boundaries is None: internal_boundaries = river_corridors else: internal_boundaries = river_corridors + internal_boundaries tri_res = watershed_workflow.triangulate(hucs, rivers, internal_boundaries, hole_points, as_mesh=False, **kwargs) assert not isinstance(tri_res, watershed_workflow.mesh.Mesh2D) assert not isinstance(tri_res[0], watershed_workflow.mesh.Mesh2D) tri_coords = tri_res[0] tri_elems : List[List[int]] = [tri.tolist() for tri in tri_res[1]] # merge elements into a single output elems = tri_elems + river_elems # note, all river verts are in the tri_verts, listed first, and in the same order! coords = tri_coords # We could now recover the polygon linestrings in SplitHUCs, but don't... TBD --ETC if as_mesh: m2 = watershed_workflow.mesh.Mesh2D(coords, elems, crs=hucs.crs) if len(tri_res) > 3: return (m2, tri_res[2], tri_res[3]) else: assert len(tri_res) == 2 return m2 else: if len(tri_res) > 3: return (coords, elems, tri_res[2], tri_res[3]) else: assert len(tri_res) == 2 return (coords, elems)
[docs] def elevate(m2 : watershed_workflow.mesh.Mesh2D, dem : xr.DataArray, **kwargs) -> None: """Elevate a mesh onto the provided dem, in place. Parameters ---------- mesh_crs : crs-type Mesh coordinate system. dem : xr.DataArray 2D array forming an elevation raster. algorithm : str, optional Algorithm used for interpolation. One of: * "nearest" for nearest-neighbor pixels * "piecewise bilinear" for interpolation (default) Returns ------- out : np.array((n_points, 3), 'd') Array of triangle vertices, including a z-dimension. """ mesh_points = m2.coords # index the i,j of the points, pick the elevations elev = watershed_workflow.data.interpolateValues(mesh_points, m2.crs, dem, **kwargs) # create the 3D points if mesh_points.shape[1] == 3: mesh_points[:,2] = elev.values else: new_points = np.zeros((len(mesh_points), 3), 'd') new_points[:, 0:2] = mesh_points new_points[:, 2] = elev.values m2.coords = new_points
[docs] def getDatasetOnMesh(m2 : watershed_workflow.mesh.Mesh2D, data : xr.DataArray, **kwargs) -> np.ndarray: """Interpolate xarray data onto cell centroids of a mesh.""" mesh_points = m2.centroids interpolated_data = watershed_workflow.data.interpolateValues(mesh_points, m2.crs, data, **kwargs) # Ensure the data type of the interpolated data matches the input data if not np.issubdtype(interpolated_data.dtype, data.dtype): interpolated_data = interpolated_data.astype(data.dtype) return interpolated_data
[docs] def getShapePropertiesOnMesh(m2 : watershed_workflow.mesh.Mesh2D, df : gpd.GeoDataFrame, column : str, resolution : float, nodata: Optional[Union[int, float]] = None, **kwargs ) -> np.ndarray: """Intepolate shape data onto cell centroids of a mesh.""" dataarray = watershed_workflow.data.rasterizeGeoDataFrame(df, column, resolution, nodata=nodata) return getDatasetOnMesh(m2, dataarray, **kwargs)
def makeMap(m): folium.LayerControl().add_to(m) folium.plugins.Fullscreen().add_to(m) folium.plugins.MeasureControl().add_to(m) return m