Source code for watershed_workflow.river_mesh

"""creates river mesh using quad, pentagon and hexagon elements"""
from typing import Callable, List, Tuple, Dict, Optional

import numpy as np
import pandas as pd
import logging

from matplotlib import pyplot as plt
import matplotlib.axes

import geopandas as gpd

import shapely.geometry
import shapely.ops

import watershed_workflow.utils
import watershed_workflow.tinytree
import watershed_workflow.angles
import watershed_workflow.plot
from watershed_workflow.river_tree import River
from watershed_workflow.split_hucs import SplitHUCs
import watershed_workflow.sources.standard_names as names


def _isNonoverlapping(points: np.ndarray, elems: List[List[int]], tol: float = 1) -> bool:
    """Check if a set of polygon shapes are nonoverlapping.

    Parameters
    ----------
    points : np.ndarray
        Array of coordinate points.
    elems : List[List[int]]
        List of element connectivity, each element is a list of point indices.
    tol : float, optional
        Tolerance for area comparison, by default 1.

    Returns
    -------
    bool
        True if shapes are nonoverlapping within tolerance.
    """
    shps = [shapely.geometry.Polygon(points[e]) for e in elems]
    total_area = shapely.unary_union(shps).area
    summed_area = sum(shp.area for shp in shps)
    return abs(total_area - summed_area) < tol


def _computeExpectedNumCoords(river: River) -> int:
    """Compute the number of expected coordinates for river mesh.

    Parameters
    ----------
    river : River
        River network to compute coordinates for.

    Returns
    -------
    int
        Expected number of coordinates in the river mesh.
    """
    # two outlet points
    n = 2

    # internal points
    n += sum(2 * (len(reach.linestring.coords) - 2) for reach in river)

    # endpoints
    n += sum(len(reach.children) + 1 for reach in river)
    return n


def _computeExpectedNumElems(river: River) -> int:
    """Compute the number of expected elements for river mesh.

    Parameters
    ----------
    river : River
        River network to compute elements for.

    Returns
    -------
    int
        Expected number of elements in the river mesh.
    """
    return sum(len(reach.linestring.coords) - 1 for reach in river)


[docs] def createWidthFunction( arg: Dict[int, float] | Callable[[River, ], float] | float) -> Callable[[River, ], float]: """Create a width function from various argument types. Parameters ---------- arg : Dict[int, float] | Callable[[int], float] | float Width specification - can be a constant, dictionary mapping stream order to width, or callable function. Returns ------- Callable[[int], float] Function that computes width for a given reach. """ if isinstance(arg, dict): def func(reach): return arg[reach[names.ORDER]] elif callable(arg): func = arg else: def func(reach): return arg return func
def _plotRiver(river: River, coords: np.ndarray, ax: matplotlib.axes.Axes) -> None: """Plot the river and elements for a debugging plot. Parameters ---------- river : River River network to plot. coords : np.ndarray Array of mesh coordinates. ax : matplotlib.axes.Axes Axes to plot on. """ river.plot(color='b', marker='+', ax=ax) elems = gpd.GeoDataFrame(geometry=[ shapely.geometry.Polygon(coords[elem]) for reach in river for elem in reach[names.ELEMS] ]) watershed_workflow.plot.linestringsWithCoords(elems.boundary, color='g', marker='x', ax=ax)
[docs] def createRiversMesh(hucs : SplitHUCs, rivers : List[River], computeWidth : Callable[[River,], float], ax : Optional[matplotlib.axes.Axes] = None) -> \ Tuple[np.ndarray, List[List[int]], List[shapely.geometry.Polygon], List[shapely.geometry.Point], gpd.GeoDataFrame | None, ]: """Create meshes for each river and merge them. Parameters ---------- hucs : SplitHUCs Split HUCs object for mesh adjustment. rivers : List[River] List of river networks to mesh. computeWidth : Callable[[River], float] Function to compute river width. ax : matplotlib.axes.Axes, optional Axes for debugging plots, by default None. Returns ------- Tuple[np.ndarray, List[List[int]], List[shapely.geometry.Polygon], List[shapely.geometry.Point], gpd.GeoDataFrame | None] Tuple containing coordinates, elements, corridors, hole points, and intersections dataframe. """ elems: List[List[int]] = [] coords: List[np.ndarray] = [] corridors: List[shapely.geometry.Polygon] = [] hole_points: List[shapely.geometry.Point] = [] i = 0 elems_gid_start = 0 for river in rivers: # create the mesh lcoords, lelems = createRiverMesh(river, computeWidth, elems_gid_start) elems_gid_start += len(lelems) # adjust the HUC linestrings to include the small cross-stream # segment adjustHUCsToRiverMesh(hucs, river, lcoords) if ax is not None: logging.info('Plotting the river mesh') _plotRiver(river, lcoords, ax) # hole point is the centroid of the outlet element hole_points.append(lcoords[lelems[-1]].mean(axis=0)) # corridor is the trace of the outside corridors.append(shapely.geometry.Polygon(lcoords)) # shift to get a global ordering if i != 0: lelems = [[j + i for j in e] for e in lelems] elems.extend(lelems) coords.append(lcoords) i += len(lcoords) if ax is not None: hucs.plotAsLinestrings(color='k', marker='x', ax=ax) all_coords = np.concatenate(coords) if not _isNonoverlapping(all_coords, elems): logging.warning( f'Found at least one intersection overlapping elements in the river mesh... searching for the first intersection now' ) # find overlaps # -- create reach polygons reach_polys = [ shapely.unary_union([ shapely.geometry.Polygon([all_coords[e] for e in elem]) for elem in reach[names.ELEMS] ]) for river in rivers for reach in river ] reach_ids = [reach[names.ID] for river in rivers for reach in river] # -- find pairwise intersections intersection_i = [] intersection_j = [] intersection_p = [] for i in range(0, len(reach_polys) - 1): for j in range(i + 1, len(reach_polys)): if reach_polys[i].intersection(reach_polys[j]).area > 0: intersection_i.append(reach_ids[i]) intersection_j.append(reach_ids[j]) intersection_p.append(reach_polys[i].intersection(reach_polys[j])) intersections_df = gpd.GeoDataFrame(data={ 'i': intersection_i, 'j': intersection_j, }, geometry=intersection_p, crs=hucs.crs) if ax is not None: intersections_df.plot(color='k', ax=ax) else: intersections_df = None return all_coords, elems, corridors, hole_points, intersections_df
[docs] def createRiverMesh(river: River, computeWidth: Callable[[River, ], float], elems_gid_start: int = 0): """Returns list of elems and river corridor polygons for a given list of river trees Parameters: ----------- rivers: list(River object) List of river tree along which river meshes are to be created widths: float or dict or callable or boolean Width of the quads, either a float or a dictionary providing a {StreamOrder : width} mapping. Or a function (callable) that computer width using node properties Or boolean, where True means, width for each reach is explicitely provided properties as "width" enforce_convexity: boolean If true, enforce convexity of the pentagons/hexagons at the junctions. ax : matplotlib Axes object, optional For debugging -- plots troublesome reaches as quad elements are generated to find tricky areas. label : bool, optional = True If true and ax is provided, animates the debugging plot with reach ID labels as the user hovers over the plot. Requires a widget backend for matplotlib. Returns ------- corrs: list(shapely.geometry.Polygon) List of river corridor polygons, one per river, storing the coordinates used in elems. elems: list(list) List of river elements, each element a list of indices into corr.coords. """ coords = np.nan * np.ones((_computeExpectedNumCoords(river), 2), 'd') river.df[names.ELEMS] = pd.Series([[list() for i in range(len(ls.coords) - 1)] for ls in river.df.geometry], index=river.df.index) # project the starting point # k tracks the index of the point/coordinate k = 0 debug: Tuple[int | None, int | None] = None, None # reach index, coordinate index if debug[0] != None and debug[1] != None: node = river.getNode(debug[0]) if node is not None: logging.info( f"Debugging reach {debug[0]}, coordinate {debug[1]}, at {node.linestring.coords[debug[1]]}" ) for touch, reach in river.prePostInBetweenOrder(): halfwidth = computeWidth(reach) / 2. reach_elems = reach[names.ELEMS] if debug[0] == reach.index: logging.info(f'PRE: reach = {reach.index}, touch = {touch}, elems = {reach_elems}') if touch == 0: # add paddler's right downstream point if reach.parent is None: # A simple projection orthogonal to the downstream segment # TODO -- follow the HUC boundary? coords[k] = projectOne(reach.linestring.coords[-2], reach.linestring.coords[-1], halfwidth) if reach.index == debug[0] and (-1 == debug[1] or len(reach.linestring.coords) - 1 == debug[1]): logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} outlet right") reach_elems[-1].append(k) k += 1 # add paddler's right internal points by two-touches projection for i in reversed(range(1, len(reach.linestring.coords) - 1)): coords[k] = projectTwo(reach.linestring.coords[i - 1], reach.linestring.coords[i], reach.linestring.coords[i + 1], halfwidth, halfwidth, reach.index == debug[0] and i == debug[1]) if reach.index == debug[0] and i == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} internal right") reach_elems[i].append(k) reach_elems[i - 1].append(k) k += 1 # add the upstream point if len(reach.children) == 0: # add an upstream triangle tip at stream midpoint coords[k] = reach.linestring.coords[0] if reach.index == debug[0] and 0 == debug[1]: logging.info(f" -- adding coord {k} = {coords[k]} as {reach.index} leaf tip") reach_elems[0].append(k) k += 1 elif len(reach.children) == 1: # add an upstream, paddler's right point based on inline junction of two reaches child_halfwidth = computeWidth(reach.children[0]) / 2. coords[k] = projectTwo(reach.children[0].linestring.coords[-2], reach.linestring.coords[0], reach.linestring.coords[1], child_halfwidth, halfwidth, reach.index == debug[0] and 0 == debug[1]) if reach.index == debug[0] and 0 == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} inline child upstream right" ) reach_elems[0].append(k) child_elems = reach.children[0][names.ELEMS] child_elems[-1].append(k) k += 1 else: # add an upstream, paddler's right point based on junction of multiple reaches coords[k] = projectJunction(reach, touch, computeWidth, reach.index == debug[0] and 0 == debug[1]) if reach.index == debug[0] and 0 == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} junction child upstream right" ) reach_elems[0].append(k) reach.children[0][names.ELEMS][-1].append(k) k += 1 if touch == len(reach.children): if len(reach.children) == 0: pass # no second point elif len(reach.children) == 1: # add an upstream, paddler's left pont based on inline junction of two reaches child_halfwidth = computeWidth(reach.children[-1]) / 2. coords[k] = projectTwo(reach.linestring.coords[1], reach.linestring.coords[0], reach.children[-1].linestring.coords[-2], halfwidth, child_halfwidth, reach.index == debug[0] and 0 == debug[1]) if reach.index == debug[0] and 0 == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} inline child upstream left" ) reach_elems[0].append(k) reach.children[-1][names.ELEMS][-1].append(k) k += 1 else: # add an upstream, paddler's left point based on junction of multiple reaches coords[k] = projectJunction(reach, touch, computeWidth, reach.index == debug[0] and 0 == debug[1]) if reach.index == debug[0] and 0 == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} junction child upstream left" ) reach_elems[0].append(k) reach.children[-1][names.ELEMS][-1].append(k) k += 1 # add a paddler's left internal point for i in range(1, len(reach.linestring.coords) - 1): coords[k] = projectTwo(reach.linestring.coords[i + 1], reach.linestring.coords[i], reach.linestring.coords[i - 1], halfwidth, halfwidth, (reach.index == debug[0] and i == debug[1])) if reach.index == debug[0] and i == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} internal left") reach_elems[i - 1].append(k) reach_elems[i].append(k) k += 1 # add paddler's left downstream point if reach.parent is None: # A simple projection orthogonal to the downstream segment # TODO -- follow the HUC boundary? coords[k] = projectOne(reach.linestring.coords[-2], reach.linestring.coords[-1], -halfwidth) if reach.index == debug[0] and (-1 == debug[1] or len(reach.linestring.coords) - 1 == debug[1]): logging.info(f" -- adding coord {k} = {coords[k]} as {reach.index} outlet left") reach_elems[-1].append(k) k += 1 if touch != 0 and touch != len(reach.children): # add a mid-tributary junction point coords[k] = projectJunction(reach, touch, computeWidth, reach.index == debug[0] and 0 == debug[1]) if reach.index == debug[0] and 0 == debug[1]: logging.info( f" -- adding coord {k} = {coords[k]} as {reach.index} junction midpoint") reach_elems[0].append(k) reach.children[touch - 1][names.ELEMS][-1].append(k) reach.children[touch][names.ELEMS][-1].append(k) k += 1 if debug[0] == reach.index: logging.info(f'POST: reach = {reach.index}, touch = {touch}, elems = {reach_elems}') assert k == len(coords) # check convexity for reach in river: for k, elem in enumerate(reach[names.ELEMS]): e_coords = coords[elem] if not watershed_workflow.utils.isConvex(e_coords): assert k == 0 new_e_coords = fixConvexity(reach, e_coords, computeWidth) for c_index, coord in zip(elem, new_e_coords): coords[c_index] = coord # gather elems elems = [e for reach in river.postOrder() for e in reach[names.ELEMS]] assert len(elems) == _computeExpectedNumElems(river) # assign GID to each elem start # note this must be done in the same order as above elems if names.ELEMS_GID_START not in river.df.columns: river.df[names.ELEMS_GID_START] = -np.ones(len(river.df), 'i') for reach in river.postOrder(): reach[names.ELEMS_GID_START] = elems_gid_start elems_gid_start += len(reach[names.ELEMS]) return coords, elems
[docs] def adjustHUCsToRiverMesh(hucs: SplitHUCs, river: River, coords: np.ndarray) -> None: """Adjust HUC segments that touch reach endpoints to match the corridor coordinates. Parameters ---------- hucs : SplitHUCs Split HUCs object to adjust. river : River River network with mesh coordinates. coords : np.ndarray Array of mesh coordinates. """ # downstream the river outlet remerge, touches = watershed_workflow.angles._getOutletLinestrings(hucs, river) if len(touches) > 0: assert len(touches) == 3 logging.info(f"Adjusting HUC to match reaches at outlet") # touches[1] is paddler's left left_old_coords = touches[1][1].coords left_new_coord = coords[river[names.ELEMS][-1][-1]] left_new_ls = shapely.geometry.LineString(left_old_coords[:-1] + [left_new_coord, ]) right_old_coords = touches[2][1].coords right_new_coord = coords[river[names.ELEMS][-1][0]] right_new_ls = shapely.geometry.LineString(right_old_coords[:-1] + [right_new_coord, ]) if remerge: new_ls = shapely.geometry.LineString( list(reversed(left_new_ls.coords)) + list(right_new_ls.coords[1:])) hucs.linestrings[touches[1][0]] = new_ls else: hucs.linestrings[touches[1][0]] = watershed_workflow.utils.reverseLineString( left_new_ls) if touches[1][2] else left_new_ls hucs.linestrings[touches[2][0]] = watershed_workflow.utils.reverseLineString( right_new_ls) if touches[2][2] else right_new_ls # adjust all upstream endpoints for reach in river: touches = watershed_workflow.angles._getUpstreamLinestrings(hucs, reach) if len(touches) > len(reach.children) + 1: logging.info( f"Adjusting HUC to match reaches at reach {reach.index} and coordinate {reach.linestring.coords[0]}" ) # yes, there are junctions involved... point_i = 1 touch_i = 1 while touch_i < len(touches): if touches[touch_i][0] >= 0: # make sure touches before and after are reaches # # This will fail if there are two successive HUC # strings. I'm not sure that should ever happen, # but if it does, we would have to choose which # point to put on the reach junction element # coordinate, and wierd stuff would probably # happen in triangulation anyway. assert touches[touch_i-1][0] is None or touches[touch_i-1][0] < 0, \ f"Neighboring touch at reach {reach.index} coords {reach.linestring.coords[0]} is wierd" assert touches[(touch_i+1)%len(touches)][0] is None or touches[(touch_i+1)%len(touches)][0] < 0, \ f"Neighboring touch at reach {reach.index} coords {reach.linestring.coords[0]} is wierd" # it is a HUC, insert the point new_coord = coords[reach[names.ELEMS][0][point_i]] old_coords = touches[touch_i][1].coords new_ls = shapely.geometry.LineString(old_coords[:-1] + [new_coord, ]) hucs.linestrings[touches[touch_i][0]] = new_ls else: point_i += 1 touch_i += 1
[docs] def computeLine(p1: np.ndarray, p2: np.ndarray) -> Tuple[float, float, float]: """Compute line coefficients (Ax + By + C = 0) for a line defined by two points. Parameters ---------- p1 : np.ndarray First point coordinates. p2 : np.ndarray Second point coordinates. Returns ------- Tuple[float, float, float] Line coefficients (A, B, C) such that Ax + By + C = 0. """ A = p2[1] - p1[1] B = p1[0] - p2[0] C = p2[0] * p1[1] - p1[0] * p2[1] return A, B, C
[docs] def translateLinePerpendicular(line: Tuple[float, float, float], distance: float) -> Tuple[float, float, float]: """Translate a line by a specified distance in the direction perpendicular to the line. Parameters ---------- line : Tuple[float, float, float] Tuple of line coefficients (A, B, C). distance : float Scalar distance to translate the line. Returns ------- Tuple[float, float, float] Tuple of new line coefficients (A, B, C). """ A, B, C = line # Normalize A and B to get the unit normal vector normal_length = np.hypot(A, B) if normal_length == 0: raise ValueError("Invalid line coefficients.") # Translate C by the perpendicular distance C_new = C - distance*normal_length return A, B, C_new
[docs] def findIntersection(line1: Tuple[float, float, float], line2: Tuple[float, float, float], debug: bool = False) -> np.ndarray | None: """Find the intersection point of two lines given by coefficients. Parameters ---------- line1 : Tuple[float, float, float] First line coefficients (A, B, C). line2 : Tuple[float, float, float] Second line coefficients (A, B, C). debug : bool, optional Whether to print debug information, by default False. Returns ------- np.ndarray | None Intersection point coordinates, or None if lines are parallel. """ A1, B1, C1 = line1 A2, B2, C2 = line2 # shift the intercept to near the origin determinant = A1*B2 - A2*B1 eps = 1.e-8 * max(abs(A1), abs(A2), abs(B1), abs(B2), abs(C1), abs(C2)) if debug: logging.info(f" Parallel? det = {determinant} relative {eps}") if abs(determinant) < eps: return None # Lines are parallel or coincident x = (B1*C2 - B2*C1) / determinant y = (A2*C1 - A1*C2) / determinant return np.array([x,y])
[docs] def projectOne(p_up: np.ndarray, p: np.ndarray, width: float) -> np.ndarray: """Find a point p_out that is width away from p and such that p_up --> p is right-perpendicular to p --> p_out. Parameters ---------- p_up : np.ndarray Upstream point coordinates. p : np.ndarray Reference point coordinates. width : float Distance to project perpendicular to the line. Returns ------- np.ndarray Projected point coordinates. """ c_up = np.array(p_up) c = np.array(p) dp = (c - c_up) dp /= np.linalg.norm(dp) perp = np.array([dp[1], -dp[0]]) return p + width*perp
[docs] def projectTwo(p_up: np.ndarray, p: np.ndarray, p_dn: np.ndarray, width1: float, width2: float, debug: bool = False) -> np.ndarray: """Find a point that is perpendicular to the linestring p_up --> p --> p_dn. Projects by intersecting two lines, one parallel to p_up --> p and width1 away, and one parallel to p --> p_dn and width2 away. Parameters ---------- p_up : np.ndarray Upstream point coordinates. p : np.ndarray Middle point coordinates. p_dn : np.ndarray Downstream point coordinates. width1 : float Width for upstream segment. width2 : float Width for downstream segment. debug : bool, optional Whether to print debug information, by default False. Returns ------- np.ndarray Projected point coordinates. """ l1 = computeLine(p_up, p) if debug: logging.info(f"line defined by: {p_up} --> {p} = {l1}") l1 = translateLinePerpendicular(l1, width1) if debug: logging.info(f" translated by {width1} = {l1}") l2 = computeLine(p, p_dn) if debug: logging.info(f"line defined by: {p} --> {p_dn} = {l2}") l2 = translateLinePerpendicular(l2, width2) if debug: logging.info(f" translated by {width2} = {l2}") intersection = findIntersection(l1, l2, debug) if intersection is None: new_p = projectOne(p_up, p, (width1+width2) / 2.) if debug: logging.info(f"parallel! results in intersection = {new_p}") return new_p elif np.linalg.norm(intersection - p) \ > 4 * max(abs(width1), abs(width2)): # a degenerative point often caused by differing widths and nearly parallel lines new_p1 = projectOne(p_up, p, width1) new_p2 = projectOne(p_dn, p, -width2) new_p = np.array(watershed_workflow.utils.computeMidpoint(new_p1, new_p2)) if debug: logging.info(f"results in failure of intersection, computing midpoint = {new_p}") return new_p else: if debug: logging.info(f"results in intersection = {intersection}") return intersection
[docs] def projectJunction(reach: River, child_idx: int, computeWidth: Callable[[River], float], debug: bool = False) -> np.ndarray: """Find points around the junction between reach and its children. Parameters ---------- reach : River Parent reach containing the junction. child_idx : int Index of the child reach at the junction. computeWidth : Callable[[River], float] Function to compute river width. debug : bool, optional Whether to print debug information, by default False. Returns ------- np.ndarray Junction point coordinates. """ if child_idx == 0: return projectTwo(reach.children[0].linestring.coords[-2], reach.linestring.coords[0], reach.linestring.coords[1], computeWidth(reach.children[0]) / 2., computeWidth(reach) / 2., debug) elif child_idx == len(reach.children): return projectTwo(reach.linestring.coords[1], reach.linestring.coords[0], reach.children[-1].linestring.coords[-2], computeWidth(reach) / 2., computeWidth(reach.children[-1]) / 2., debug) else: return projectTwo(reach.children[child_idx].linestring.coords[-2], reach.linestring.coords[0], reach.children[child_idx - 1].linestring.coords[-2], computeWidth(reach.children[child_idx]) / 2., computeWidth(reach.children[child_idx - 1]) / 2., debug)
[docs] def fixConvexity(reach: River, e_coords: np.ndarray, computeWidth: Callable[[River], float]) -> np.ndarray: """Snap element coordinates onto the convex hull while respecting upstream stream width. Parameters ---------- reach : River River reach containing the element. e_coords : np.ndarray Element coordinates to fix. computeWidth : Callable[[River], float] Function to compute river width. Returns ------- np.ndarray Fixed element coordinates. """ e_poly = shapely.geometry.Polygon(e_coords) e_poly_hull = e_poly.convex_hull # snap to convex hull fix_points = [] for i, coord in enumerate(e_coords): closest_p = watershed_workflow.utils.findNearestPoint(shapely.geometry.Point(coord), e_poly_hull.boundary) if not watershed_workflow.utils.isClose(coord, closest_p, 1.e-4): fix_points.append(i) if i == 1 or i == len(e_coords) - 2: # intersect the shifted line with the convex hull to find the new point if i == 1: child = reach.children[0] sign = 1 elif i == len(e_coords) - 2: child = reach.children[-1] sign = -1 p0 = child.linestring.coords[-1] p1 = child.linestring.coords[-2] halfwidth = computeWidth(child) / 2. p0p = projectOne(p1, p0, sign * halfwidth) p1p = projectOne(p0, p1, -sign * halfwidth) ls = shapely.geometry.LineString([p0p, p1p]) if not ls.intersects(e_poly_hull.boundary): fig, ax = plt.subplots(1, 1) reaches = [reach, ] if reach.parent is not None: reaches.append(reach.parent) reaches = reaches + list(reach.children) for r in reaches: ax.plot(r.linestring.xy[0], r.linestring.xy[1], 'b-x') poly = shapely.geometry.Polygon(e_coords) ax.plot(poly.exterior.xy[0], poly.exterior.xy[1], 'g-x') ax.plot(ls.xy[0], ls.xy[1], 'r-x') ax.plot(e_poly_hull.boundary.xy[0], e_poly_hull.boundary.xy[1], 'k-x') ax.set_aspect('equal', adjustable='box') plt.show() assert False, "No intersection point with convex hull?" new_c_p = ls.intersection(e_poly_hull.boundary) if isinstance(new_c_p, shapely.geometry.MultiPoint): # two intersections... fig, ax = plt.subplots(1, 1) reaches = [reach, ] if reach.parent is not None: reaches.append(reach.parent) reaches = reaches + list(reach.children) for r in reaches: ax.plot(r.linestring.xy[0], r.linestring.xy[1], 'b-x') poly = shapely.geometry.Polygon(e_coords) ax.plot(poly.exterior.xy[0], poly.exterior.xy[1], 'g-x') ax.plot(ls.xy[0], ls.xy[1], 'r-x') ax.plot(e_poly_hull.boundary.xy[0], e_poly_hull.boundary.xy[1], 'k-x') ax.set_aspect('equal', adjustable='box') plt.show() assert False, "Dual intersection points with convex hull?" assert isinstance(new_c_p, shapely.geometry.Point) new_c = new_c_p.coords[0] elif i > 1 and i < len(e_coords) - 2: # snap it to the nearest point? new_c = closest_p else: # a 0th or last point should never be non-convex? fig, ax = plt.subplots(1, 1) reaches = [reach, reach.parent] + list(reach.children) for r in reaches: ax.plot(r.linestring.xy[0], r.linestring.xy[1], 'b-x') poly = shapely.geometry.Polygon(e_coords) ax.plot(poly.exterior.xy[0], poly.exterior.xy[1], 'g-x') ax.scatter([coord[0], ], [coord[1], ], color='g', marker='o') ax.plot(e_poly_hull.boundary.xy[0], e_poly_hull.boundary.xy[1], 'k-x') ax.set_aspect('equal', adjustable='box') plt.show() assert False, "Do not know how to deal with non-convexity that doesn't start at the outermost children." e_coords[i] = new_c if not watershed_workflow.utils.isConvex(e_coords): # a 0th or last point should never be non-convex? fig, ax = plt.subplots(1, 1) reaches = [reach, reach.parent] + list(reach.children) for r in reaches: ax.plot(r.linestring.xy[0], r.linestring.xy[1], 'b-x') poly = shapely.geometry.Polygon(e_coords) ax.plot(poly.exterior.xy[0], poly.exterior.xy[1], 'g-x') for i in fix_points: ax.scatter([e_coords[i, 0], ], [e_coords[i, 1], ], color='g', marker='o') ax.plot(e_poly_hull.boundary.xy[0], e_poly_hull.boundary.xy[1], 'k-x') ax.set_aspect('equal', adjustable='box') plt.show() assert False, "Cannot fix nonconvexity?" return e_coords