"""Shape and geometry utilities for working with fiona and shapely objects.
Note this module contains a lot of other simple functions that are commonly
used by other functions, but are not included in documentation because they are
likely not useful to users.
"""
import calendar
import datetime
import cftime
import logging
import subprocess
import numpy as np
import math
import scipy.interpolate
import shapely.geometry
import shapely.ops
import shapely.wkt
import shapely.prepared
import shapely.affinity
import rasterio
import rasterio.transform
import copy
import watershed_workflow.crs
_tol = 1.e-7
#
# Constructors
#
[docs]
def create_shply(shape, properties=None, flip=False):
"""Converts a fiona style shape to a shapely shape with as much collapsing as possible.
Note this collapses objects -- for instance, fiona MultiPolygons of length
1 are turned into shapely Polygons.
Parameters
----------
shape : fiona shape
Fiona shape to convert to shapely.
properties : dict, optional
A dictionary of parameters to associate with the object. Defaults to
shape['properties'] if it exists, None otherwise.
flip : bool, optional
Flip x,y coordinates while making the translation. This helps if files
provide lat-long ordered coordinates (note that is y,x) as opposed to
long-lat (x,y). Default is False.
Returns
-------
thing : shapely shape
"""
if 'geometry' in shape:
if properties is None:
if 'properties' in shape:
properties = shape['properties']
else:
properties = dict()
shape = shape['geometry']
try:
thing = shapely.geometry.shape(shape)
if type(thing) is shapely.geometry.MultiPoint and len(thing.geoms) == 1:
thing = thing.geoms[0]
elif type(thing) is shapely.geometry.MultiLineString and len(thing.geoms) == 1:
thing = thing.geoms[0]
elif type(thing) is shapely.geometry.MultiPolygon and len(thing.geoms) == 1:
thing = thing.geoms[0]
# first check for latlon instead of lonlat
if flip:
thing = shapely.ops.transform(lambda x, y: (y, x), thing)
thing.properties = properties
thing_2D = remove_third_dimension(thing)
thing_2D.properties = thing.properties
return thing_2D
except ValueError:
raise ValueError(
'Converting to shapely got error: "%s" Maybe you forgot to do shp["geometry"]?')
[docs]
def deepcopy(list_of_shapes):
"""Deals with properties dictionary"""
new_list = [shp.__class__(shp) for shp in list_of_shapes]
for new, old in zip(new_list, list_of_shapes):
if hasattr(old, 'properties'):
new.properties = old.properties
return new_list
[docs]
def create_bounds(f):
"""General bounding box for fiona and shapely types."""
# fiona type
x, y = zip(*list(generate_coords(f)))
# except TypeError:
# # shapely type
# return f.bounds
# else:
return min(x), min(y), max(x), max(y)
[docs]
def create_raster_profile(bounds, crs, resolution, dtype=None, nodata=None, count=1):
"""Creates a profile for a raster.
Parameters
----------
bounds : [x_min, y_min, x_max, y_max]
Bounding box for the raster.
crs : CRS object
Target coordinate system.
resolution : tuple or float
Pixel width, in units of the crs. If a tuple, (dx,dy). If a
float, then dx = dy.
dtype : optional
If provided, sets the data type.
nodata : dtype, optional
If provided, sets the nodata value.
count : int, optional
Note that dx/dy are always used. The bounds are adjusted to make
them an even multiple of dx/dy.
Returns
-------
dict
Dictionary profile, including a transform and all other needed
metadata to create a raster.
"""
try:
dx, dy = resolution
except TypeError:
dx = resolution
dy = resolution
if dtype is None and nodata is not None:
dytpe = type(nodata)
x0 = np.round(bounds[0] - dx/2)
y1 = np.round(bounds[3] + dx/2)
width = int(np.ceil((bounds[2] + dx/2 - x0) / dx))
height = int(np.ceil((y1 - bounds[1] - dx/2) / dx))
out_bounds = [x0, y1 - dy*height, x0 + dx*width, y1]
transform = rasterio.transform.from_origin(x0, y1, dx, dx)
out_profile = {
'height': height,
'width': width,
'count': count,
'dtype': dtype,
'crs': crs,
'transform': transform,
'nodata': nodata
}
return out_profile
[docs]
def create_empty_raster(bounds, crs, resolution, nodata, count=1):
"""Generates a profile and a nodata-filled array."""
profile = create_raster_profile(bounds, crs, resolution, nodata=nodata, count=count)
out = profile['nodata'] * np.ones(
(profile['count'], profile['height'], profile['width']), profile['dtype'])
return profile, out
#
# Generic routines for manipulating shapes and rasters
#
[docs]
def is_empty_shapely(shp):
"""Is shp None or empty"""
return shp is None or shp.is_empty
[docs]
def round_shapes(list_of_things, digits):
"""Rounds coordinates in things or shapes to a given digits."""
for shp in list_of_things:
for ring in generate_rings(shp):
ring[:] = list(np.array(ring).round(digits))
[docs]
def round_shplys(list_of_things, digits):
"""Rounds coordinates in things or shapes to a given digits."""
return [
shapely.wkt.loads(shapely.wkt.dumps(thing, rounding_precision=digits)).simplify(0)
for thing in list_of_things
]
[docs]
def remove_third_dimension(geom):
"""Removes the third dimension of a shapely object."""
if geom.is_empty:
return geom
if isinstance(geom, shapely.geometry.Polygon):
exterior = geom.exterior
new_exterior = remove_third_dimension(exterior)
interiors = geom.interiors
new_interiors = []
for int in interiors:
new_interiors.append(remove_third_dimension(int))
return shapely.geometry.Polygon(new_exterior, new_interiors)
elif isinstance(geom, shapely.geometry.LinearRing):
return shapely.geometry.LinearRing([xy[0:2] for xy in list(geom.coords)])
elif isinstance(geom, shapely.geometry.LineString):
return shapely.geometry.LineString([xy[0:2] for xy in list(geom.coords)])
elif isinstance(geom, shapely.geometry.Point):
return shapely.geometry.Point([xy[0:2] for xy in list(geom.coords)])
elif isinstance(geom, shapely.geometry.MultiPoint):
points = list(geom.geoms)
new_points = []
for point in points:
new_points.append(remove_third_dimension(point))
return shapely.geometry.MultiPoint(new_points)
elif isinstance(geom, shapely.geometry.MultiLineString):
lines = list(geom.geoms)
new_lines = []
for line in lines:
new_lines.append(remove_third_dimension(line))
return shapely.geometry.MultiLineString(new_lines)
elif isinstance(geom, shapely.geometry.MultiPolygon):
pols = list(geom.geoms)
new_pols = []
for pol in pols:
new_pols.append(remove_third_dimension(pol))
return shapely.geometry.MultiPolygon(new_pols)
elif isinstance(geom, shapely.geometry.GeometryCollection):
geoms = list(geom.geoms)
new_geoms = []
for geom in geoms:
new_geoms.append(remove_third_dimension(geom))
return shapely.geometry.GeometryCollection(new_geoms)
else:
raise RuntimeError("Currently this type of geometry is not supported: {}".format(
type(geom)))
[docs]
def flatten(list_of_shps):
"""Flattens a list of shapes, that may contain Multi-objects, into list without multi-objects"""
new_list = []
for shp in list_of_shps:
if isinstance(shp, shapely.geometry.MultiLineString) or \
isinstance(shp, shapely.geometry.MultiPoint) or \
isinstance(shp, shapely.geometry.MultiPolygon):
new_list.extend(list(shp.geoms))
else:
new_list.append(shp)
return new_list
[docs]
def impute_holes2D(arr, nodata=np.nan, method='cubic'):
"""Very simple imputation algorithm to interpolate values for missing data in rasters.
Note, this may throw if there is a hole on the boundary?
Parameters
----------
arr : np.ndarray
2D array, with missing data.
nodata : optional = np.nan
Value to treat as a hole to fill.
method : str, optional = 'cubic'
Algorithm to use (see scipy.interpolate.griddata). Likely
'cubic', 'linear', or 'nearest'.
Returns
-------
np.ndarray
New array with no values of nodata.
"""
if nodata is np.nan:
mask = np.isnan(arr)
else:
mask = (arr == nodata)
x = np.arange(0, arr.shape[1])
y = np.arange(0, arr.shape[0])
xx, yy = np.meshgrid(x, y)
#get only the valid values
x1 = xx[~mask]
y1 = yy[~mask]
newarr = arr[~mask]
res = scipy.interpolate.griddata((x1, y1), newarr.ravel(), (xx, yy), method='cubic')
return res
[docs]
def compute_average_year(data, output_nyears=1, smooth=False, **kwargs):
"""Averages and smooths to form a "typical" year.
Parameters
----------
data : np.ndarray(shape=(NTIMES, ...))
Daily array to average, note that NTIMES > 365.
output_nyears : int, optional
Number of years to repeat the output. Default is 1.
filter : bool, optional
If true, filters the data using a Sav-Gol filter from Scipy
All other parameters are passed to the filter. See
scipy.signal.savgol_filter, but sane default values are used if
these are not provided. Note that if NTIMES % 365 != 0, the data
is truncated.
Returns
-------
np.ndarray(shape=(365*output_nyears, ...))
The averaged data.
"""
nyears = data.shape[0] // 365
if nyears == 0:
raise ValueError('Not enough data to compute average year. Need at least 365 days.')
# reshape the data to (nyears, 365, ...)
data = np.array(data[0:nyears * 365][:])
original_shape = data.shape
new_shape = (nyears, 365)
if len(original_shape) > 1:
new_shape = new_shape + original_shape[1:]
data = data.reshape(new_shape)
data = data.mean(axis=0)
# smooth if requested
if smooth:
data = smooth_array(data, smooth, axis=0, **kwargs)
# repeat the data if requested
if output_nyears != 1:
tiled_data_shape = (output_nyears, )
for i in range(len(original_shape) - 1):
tiled_data_shape = tiled_data_shape + (1, )
data = np.tile(data, tiled_data_shape)
return data
[docs]
def interpolate_in_time_regular(times,
data,
start,
end,
dt=datetime.timedelta(days=1),
axis=0,
**kwargs):
"""Interpolate time-dependent data to a regularly spaced time array.
Parameters
----------
times : np.1darray(dtype=cftime.datetime)
An array of times, of length NTIMES.
data : np.ndarray
Data to interpolate, data.shape[axis] == NTIMES.
start, end : cftime.datetime
Times to begin and end (inclusive) the interpolated array.
dt : datetime.timedelta, optional
Delta to interpolate to. Defaults to 1 day.
axis : int, optional
Axis of data that corresponds to time. Default is 0.
All other parameters are passed to scipy.interpolate.interp1d. Of
use particularly is 'kind' which can be 'linear' (default) or
'quadratic', 'cubic' or others.
Returns
-------
new_times : np.1darray(dtype=datetime.date)
Times of the new array.
new_data : np.ndarray
The data interpolated.
"""
if data.shape[axis] != len(times):
raise ValuerError("Data and times array are not of the expected shape.")
# new_times for interpolation
new_count = int(np.ceil((end-start) / dt))
new_times = np.array([start + i*dt for i in range(new_count + 1)])
# interpolate onto new_times
new_data = interpolate_in_time(times, data, new_times, axis, **kwargs)
return new_times, new_data
[docs]
def interpolate_in_time(times, data, new_times, axis=0, units="days since 2000", **kwargs):
"""Interpolate time-dependent data to an arbitrary other time array.
Parameters
----------
times : np.1darray(dtype=cftime.datetime)
An array of times, of length NTIMES.
data : np.ndarray
Data to interpolate, data.shape[axis] == NTIMES.
new_times : np.1darray(dtype=cftime.datetime)
An array of times to interpolate to.
axis : int, optional
Axis of data that corresponds to time. Default is 0.
units : str, optional
Interpolation must happen in a numeric coordinate -- this unit
is used to convert from dates to numbers using
cftime.date2num. Valid cfunits for time are strings like "days since
2000-1-1", which is the default.
All other parameters are passed to scipy.interpolate.interp1d. Of
use particularly is 'kind' which can be 'linear' (default) or
'quadratic', 'cubic' or others.
Returns
-------
new_data : np.ndarray
The data interpolated.
"""
if data.shape[axis] != len(times):
raise ValueError("Data and times array are not of the expected shape.")
if times[0].calendar != new_times[0].calendar:
raise ValueError("times and new_times must have the same calendar.")
# create an interpolator in a modified coordinate system
x = cftime.date2num(times, units)
interp = scipy.interpolate.interp1d(x, data, axis=axis, assume_sorted=True, **kwargs)
# interpolate at new_times in the modified coordinate system
new_x = cftime.date2num(new_times, units)
new_data = interp(new_x)
return new_data
[docs]
def smooth_array(data, method, axis=0, **kwargs):
"""Smooths fixed-interval time-series data using a Sav-Gol filter from scipy.
Note that this wrapper just sets some sane default values for
daily data -- one could just as easily call
scipy.signal.savgol_filter themselves.
Parameters
----------
data : np.ndarray
The data to smooth.
window_length : int, optional
Length of the moving window over which to fit the polynomial.
Default is 61.
polyorder : int, optional
Order of the fitting polynomial. Default is 2.
axis : int, optional
Time axis over which to smooth. Default is 0.
mode : str, optional
See scipy.signal.savgol_filter documentation, but 'wrap' is the
best bet for data in multiples of years. Default is 'wrap.'
Any additional kwargs are passed to scipy.signal.savgol_filter
Returns
-------
np.ndarray
Smoothed data in the same shape as data
"""
if method is True:
method = 'savgol_filter'
if method == 'savgol_filter':
if 'window_length' not in kwargs:
kwargs['window_length'] = 61
if 'polyorder' not in kwargs:
kwargs['polyorder'] = 2
if 'mode' not in kwargs:
kwargs['mode'] = 'wrap'
return scipy.signal.savgol_filter(data, axis=axis, **kwargs)
elif method == 'convolve':
if 'window' not in kwargs:
kwargs['window'] = 'hann'
if 'Nx' not in kwargs:
kwargs['Nx'] = 50
win = scipy.signal.windows.get_window(**kwargs)
win = win / win.sum()
assert (len(data.shape) == 3 and axis == 0)
data_new = np.empty_like(data)
for i in range(data.shape[1]):
for j in range(data.shape[2]):
data_new[:, i, j] = scipy.signal.convolve(data[:, i, j],
win)[len(win) // 2:-len(win) // 2 + 1]
return data_new
else:
raise ValueError(f'Invalid smooth method {method}')
[docs]
def generate_rings(obj):
"""Generator for a fiona shape's coordinates object and yield rings.
As long as the input is conforming, the type of the geometry doesn't matter.
Parameter
---------
obj : fiona shape
Returns
-------
rings : iterator
Iterates over rings, each of which is a list of coordinate tuples.
"""
def _generate_rings(coords):
for e in coords:
if isinstance(e[0], (float, int)):
yield coords
break
else:
for r in _generate_rings(e):
yield r
if 'geometry' in obj:
obj = obj['geometry']
for r in _generate_rings(obj['coordinates']):
yield r
[docs]
def generate_coords(obj):
"""Generator for a fiona geometry's coordinates.
As long as the input is conforming, the type of the geometry doesn't
matter.
Parameter
---------
obj : fiona shape
Returns
-------
coord : iterator
Iterates over coordinate tuples.
"""
if 'geometry' in obj:
obj = obj['geometry']
if obj['type'] == 'Point':
yield obj['coordinates']
else:
for ring in generate_rings(obj):
for c in ring:
yield c
#
# Geometry
#
[docs]
def close(s1, s2, tol=_tol):
"""Are two shapely shapes topologically equivalent and geometrically close?
Note this deals with things like rotations of polygons (clock-rotating the
coordinates of the same shape are still close) and other gotchas that keep
you from just comparing coordinates.
Parameters
----------
s1, s2 : shapely shapes
Objects to compare.
tol : double
Distance to compare geometric closeness.
Returns
-------
close : bool
Is close?
"""
# deal with Multi* or list objects
def is_multi(thing):
if isinstance(thing, shapely.geometry.MultiPoint):
return True
if isinstance(thing, shapely.geometry.MultiLineString):
return True
if isinstance(thing, shapely.geometry.MultiPolygon):
return True
if isinstance(thing, list):
return True
return False
def local_len(thing):
try:
return len(thing)
except AttributeError:
return len(thing.geoms)
def iter(thing):
assert (is_multi(thing))
if isinstance(thing, list):
for t in thing:
yield t
else:
for t in thing.geoms:
yield t
if is_multi(s1):
if local_len(s1) == 1:
return close(next(iter(s1)), s2, tol)
if is_multi(s2):
if local_len(s2) == 1:
return close(s1, next(iter(s2)), tol)
if is_multi(s1) and is_multi(s2):
if local_len(s1) != local_len(s2):
return False
return all(close(i1, i2, tol) for (i1, i2) in zip(s1, s2))
# points get compared as tuples
if isinstance(s1, shapely.geometry.Point):
return close(s1.coords[0], s2, tol)
if isinstance(s2, shapely.geometry.Point):
return close(s1, s2.coords[0], tol)
# types should be the same now
if type(s1) != type(s2):
return False
# compare tuples
if isinstance(s1, tuple):
if len(s1) != len(s2):
return False
return sum((p1 - p2)**2 for p1, p2 in zip(s1, s2)) < tol**2
# compare lines
elif isinstance(s1, shapely.geometry.LineString):
if len(s1.coords) != len(s2.coords):
return False
if np.allclose(np.array(s1.coords), np.array(s2.coords), tol, tol):
return True
if np.allclose(np.array(s1.coords), np.array(list(reversed(s2.coords))), tol, tol):
return True
return False
# compare polygons
elif type(s1) is shapely.geometry.Polygon:
# note, this does not correctly deal with nonequal holes...
if len(s1.boundary.coords) != len(s2.boundary.coords):
return False
ls1 = s1.boundary.coords[:-1]
ls2 = np.array(s2.boundary.coords[:-1])
ls2f = np.flipud(ls2)
return any(np.allclose(ls1, np.roll(ls2, i, 0), tol, tol) for i in range(len(ls2))) or \
any(np.allclose(ls1, np.roll(ls2f, i, 0), tol, tol) for i in range(len(ls2)))
# compare multi-shapes by checking if each one has a match in the
# other and lengths are the same
elif isinstance(s1, (shapely.geometry.MultiPoint, shapely.geometry.MultiLineString,
shapely.geometry.MultiPolygon)):
if len(s1) != len(s2):
return False
good2 = [False, ] * len(s2)
for l1 in s1:
found = False
for i, l2 in enumerate(s2):
if not good2[i]:
if close(l1, l2, tol):
good2[i] = True
found = True
break
if not found:
return False
return True
else:
raise NotImplementedError("Not implemented for type '%r'" % type(s1))
[docs]
def contains(s1, s2, tol=_tol):
"""A contains algorithm that deals with close/roundoff issues"""
return s1.buffer(tol, 2).contains(s2)
[docs]
class CutError(Exception):
def __init__(self, message, line, seg, cutline):
super(Exception, self).__init__(message)
self.line = line
self.seg = seg
self.cutline = cutline
[docs]
def cut(line, cutline, tol=1.e-5):
"""Cuts a line at all intersections with cutline. If an existing
point in line is within tol of the cutline, do not add an additional
coordinate, just move that coordinate. Otherwise, add a new
coordinate."""
def plot():
from matplotlib import pyplot as plt
plt.plot(cutline.xy[0], cutline.xy[1], 'k-x', linewidth=3)
plt.plot(line.xy[0], line.xy[1], 'g-+', linewidth=3)
assert (type(line) is shapely.geometry.LineString)
assert (type(cutline) is shapely.geometry.LineString)
assert (line.intersects(cutline))
segs = []
coords = list(line.coords)
segcoords = [coords[0], ]
i = 0
while i < len(coords) - 1:
seg = shapely.geometry.LineString(coords[i:i + 2])
#logging.debug("Intersecting seg %d"%i)
point = seg.intersection(cutline)
if type(point) is shapely.geometry.LineString and len(point.coords) == 0:
#logging.debug("Cut seg no intersection")
segcoords.append(seg.coords[-1])
i += 1
elif type(point) is shapely.geometry.Point:
#logging.debug("Cut intersected at point")
#logging.debug(" inter point: %r"%list(point.coords[0]))
#logging.debug(" seg final point: %r"%list(seg.coords[-1]))
#logging.debug(" close? = %r"%(close(point, seg.coords[-1], tol)))
if close(point, seg.coords[-1], tol):
# intersects at the far point
segs.append(shapely.geometry.LineString(segcoords + [point, ]))
#logging.debug(" appended full segment: %r"%(list(segs[-1].coords)))
if (i < len(coords) - 2):
#logging.debug(" (not the end)")
segcoords = [point, coords[i + 2]]
else:
#logging.debug(" (the end)")
segcoords = [point, ]
i += 2 # also skip the next seg, which would also
# intersect at that seg's start point
elif close(point, seg.coords[0], tol):
# intersects at the near point
if i != 0:
assert (len(segcoords) > 1)
segs.append(shapely.geometry.LineString(segcoords[:-1] + [point, ]))
segcoords = [point, ]
else:
assert (len(segcoords) == 1)
segcoords[0] = point
segcoords.append(seg.coords[-1])
i += 1
else:
# intersects in the middle
segs.append(shapely.geometry.LineString(segcoords + [point, ]))
#logging.debug(" appended partial segment: %r"%(list(segs[-1].coords)))
segcoords = [point, seg.coords[-1]]
i += 1
else:
print("Dual/multiple section: type = {}".format(type(point)))
print(" point = {}".format(point))
raise CutError(
"Dual/multiple intersection in a single seg... ugh! "
+ "Intersection is of type '{}'".format(type(point)), line, seg, cutline)
if len(segcoords) > 1:
segs.append(shapely.geometry.LineString(segcoords))
return segs
[docs]
def distance(p1, p2):
"""Distance between two points in tuple form"""
return np.sqrt((p1[0] - p2[0])**2 + (p1[1] - p2[1])**2)
[docs]
def in_neighborhood(obj1, obj2, tol=0.1):
"""Determines if two objects can possibly intersect by performing a
quick check of their bounding boxes.
"""
minx1, miny1, maxx1, maxy1 = obj1.bounds
minx2, miny2, maxx2, maxy2 = obj2.bounds
if maxx2 < minx1 - tol or \
maxy2 < miny1 - tol or \
minx2 > maxx1 + tol or \
miny2 > maxy1 + tol:
return False
return True
[docs]
def intersect_point_to_segment(point, line_start, line_end):
"""Finds the nearest point on a line segment to a point"""
line_magnitude = line_end.distance(line_start)
assert (line_magnitude > _tol)
u = ((point.x - line_start.x) * (line_end.x - line_start.x) +
(point.y - line_start.y) * (line_end.y - line_start.y)) \
/ (line_magnitude ** 2)
# closest point does not fall within the line segment,
# take the shorter distance to an endpoint
if u < 0.:
return line_start
elif u > 1.:
return line_end
else:
ix = line_start.x + u * (line_end.x - line_start.x)
iy = line_start.y + u * (line_end.y - line_start.y)
return shapely.geometry.Point([ix, iy])
[docs]
def nearest_point(line, point):
"""Returns the nearest coordinate on the line to point.
Note point is expected as coordinates."""
if isinstance(point, tuple):
point = shapely.geometry.Point(point)
return shapely.ops.nearest_points(point, line)[1].coords[0]
[docs]
def triangle_area(vertices):
"""Area of a triangle in 2D"""
xy1 = vertices[0]
xy2 = vertices[1]
xy3 = vertices[2]
A = 0.5 * (xy2[0] * xy3[1] - xy3[0] * xy2[1] - xy1[0] * xy3[1] + xy3[0] * xy1[1]
+ xy1[0] * xy2[1] - xy2[0] * xy1[1])
return A
[docs]
def is_collinear(p0, p1, p2, tol=1e-6):
"""this function checks if three points are collinear for given tolerance value"""
x1, y1 = p1[0] - p0[0], p1[1] - p0[1]
x2, y2 = p2[0] - p0[0], p2[1] - p0[1]
return abs(x1*y2 - x2*y1) < tol
[docs]
def area(vertices):
"""Area of polygons in 2D"""
area = shapely.geometry.Polygon(vertices).area
return area
[docs]
def angle(v1, v2):
"""Given two 2D vectors represented as len 2 arrays or tuples, find the angle
of 2 relative to 1 in a clockwise notion."""
x1 = v1[0]
y1 = v1[1]
x2 = v2[0]
y2 = v2[1]
numer = x1*x2 + y1*y2
denom = np.sqrt(x1*x1 + y1*y1) * np.sqrt(x2*x2 + y2*y2)
assert (denom > 0)
arg = numer / denom
assert (arg < 1.1 and arg > -1.1) # roundoff problems
arg = min(max(numer / denom, -1), 1)
mag = 180. / np.pi * np.arccos(arg)
sign = x1*y2 - x2*y1
if sign < 0:
return -mag
else:
return mag
[docs]
def midpoint(p1, p2):
"""Returns the midpoint of two points"""
if isinstance(p1, shapely.geometry.Point):
return midpoint(p1.coords[0], p2)
if isinstance(p2, shapely.geometry.Point):
return midpoint(p1, p2.coords[0])
return ((p1[0] + p2[0]) / 2., (p1[1] + p2[1]) / 2.)
[docs]
def closest_point_ind(point, points):
"""Returns the index of closest point from an array of points"""
points = np.asarray(points)
dist_2 = np.sum((points - point)**2, axis=1)
return np.argmin(dist_2)
[docs]
def center(objects, centering=True):
"""Centers a collection of objects by removing their collective centroid"""
if type(centering) is shapely.geometry.Point:
centroid = centering
elif centering is True or centering == 'geometric':
union = shapely.ops.unary_union(objects)
centroid = shapely.geometry.Point([(union.bounds[0] + union.bounds[2]) / 2.,
(union.bounds[1] + union.bounds[3]) / 2.])
elif centering == 'mass':
union = shapely.ops.unary_union(objects)
centroid = union.centroid
else:
raise ValueError('Centering: option centering = "{}" unknown'.format(centering))
new_objs = [
shapely.affinity.translate(obj, -centroid.coords[0][0], -centroid.coords[0][1])
for obj in objects
]
for new, old in zip(new_objs, objects):
if hasattr(old, 'properties'):
new.properties = old.properties
return new_objs, centroid
[docs]
def orientation(p1, p2, p3):
"""to find the orientation of an ordered triplet (p1,p2,p3) function returns the following values:
0 : Collinear points
1 : Clockwise points
2 : Counterclockwise """
val = (float(p2.y - p1.y) * (p3.x - p2.x)) - \
(float(p2.x - p1.x) * (p3.y - p2.y))
if (val > 0):
# Clockwise orientation
return 1
elif (val < 0):
# Counterclockwise orientation
return 2
else:
# Collinear orientation
return 0
[docs]
def intersects(shp1, shp2):
"""Checks whether an intersection exists.
Note that intersection being empty and intersects are not always reliably
the same... we avoid using shapely.intersects() for this reason.
"""
inter = shp1.intersection(shp2)
return not is_empty_shapely(inter)
[docs]
def non_point_intersection(shp1, shp2):
"""Checks whether an intersection is larger than a point.
Note that intersection being empty and intersects are not always reliably
the same... we avoid using intersects() for this reason.
"""
inter = shp1.intersection(shp2)
return not (is_empty_shapely(inter) or \
isinstance(inter, shapely.geometry.Point))
[docs]
def volumetric_intersection(shp1, shp2):
"""Checks whether an intersection includes volume and not just points and lines."""
inter = shp1.intersection(shp2)
return inter.area > 0
[docs]
def filter_to_shape(shape, to_filter, tol=None, algorithm='intersects'):
"""Filters out reaches (or reaches in rivers) not inside the HUCs provided.
algorithm is one of 'contains' or 'intersects' to indicate whether
to include things entirely in shape or partially in shape,
respectively.
"""
if tol is None: tol = _tol
if algorithm == 'contains':
op = shape.contains
shape = shapely.prepared.prep(shape.buffer(2 * tol))
elif algorithm == 'intersects':
op = shape.intersects
shape = shapely.prepared.prep(shape.buffer(2 * tol))
elif algorithm == 'non_point_intersection':
op = lambda a: non_point_intersection(shape, a)
shape = shape.buffer(2 * tol)
else:
raise ValueError("algorithm must be one of 'intersects' or 'contains'")
return [s for s in to_filter if op(s)]
def is_convex(points):
poly = shapely.geometry.Polygon(points)
return math.isclose(poly.area, poly.convex_hull.area, rel_tol=1e-4)
[docs]
def cluster(points, tol):
"""Given a list of points, determine a list of clusters.
Each cluster is within tol of each other.
Returns (cluster_index, cluster_centroid)
"""
import scipy.cluster.hierarchy as hcluster
if type(points) is list:
points = np.array(points)
indices = hcluster.fclusterdata(points, tol, criterion='distance')
centroids = [points[indices == (i + 1)].mean(axis=0) for i in range(indices.max())]
return indices - 1, centroids
[docs]
def treat_segment_collinearity(segment_coords, tol=1e-5):
"""This functions removes collinearity from a node segment by making small pertubations orthogonal to the segment"""
col_checks = []
for i in range(0,
len(segment_coords)
- 2): # traversing along the segment, checking 3 consecutive points at a time
p0 = segment_coords[i]
p1 = segment_coords[i + 1]
p2 = segment_coords[i + 2]
if watershed_workflow.utils.is_collinear(
p0, p1, p2, tol=tol): # treating collinearity through a small pertubation
del_ortho = 10 * tol # shift in the middle point
if (p2[0] - p0[0]) == 0:
m = 1e6
else:
m = (p2[1] - p0[1]) / (p2[0] - p0[0])
del_y = del_ortho / (1 + m**2)**0.5
del_x = -1 * del_ortho * m / (1 + m**2)**0.5
p1 = (p1[0] + del_x, p1[1] + del_y)
segment_coords[i + 1] = p1
col_checks.append(is_collinear(p0, p1, p2))
assert (sum(col_checks) == 0)
return segment_coords