Source code for watershed_workflow.mesh

"""Tools for turning data into meshes, then writing them to file.

Works with and assumes all polyhedra cells (and polygon faces).

Requires building a reasonably recent version of Exodus to get
the associated exodus.py wrappers in python3.

Note that this is typically done in your standard ATS installation,
assuming you have built your Amanzi TPLs with shared libraries (the
default through bootstrap).

In that case, simply ensure that ${AMANZI_TPLS_DIR}/SEACAS/lib is in
your PYTHONPATH.

"""

import os
import numpy as np
import collections
import logging
import attr
import scipy.optimize
import shapely
import warnings
import functools

import watershed_workflow.vtk_io
import watershed_workflow.utils
import watershed_workflow.plot
import watershed_workflow.colors

try:
    import exodus
except Exception:
    try:
        import exodus3 as exodus
    except Exception:
        exodus = None


#
# A caching decorator, like functools.cache, but works in this case?
#
# Note, this can only work with __dict__ classes, not slotted classes.
#
def cache(func):
    @functools.wraps(func)
    def cache_func(self):
        cname = '_' + func.__name__
        if not hasattr(self, cname):
            self.__setattr__(cname, func(self))
        return self.__getattribute__(cname)

    return cache_func


[docs] @attr.define class SideSet: """A collection of faces in cells.""" name: str setid: int cell_list: list[int] side_list: list[int] def validate(self, cell_faces): ncells = len(cell_faces) for c, s in zip(self.cell_list, self.side_list): assert (0 <= c < ncells) assert (0 <= s < len(cell_faces[c]))
[docs] @attr.define(slots=False) class LabeledSet: """A generic collection of entities.""" name: str setid: int entity: str ent_ids: list[int] to_extrude: bool = False def validate(self, size, is_tuple=False): if is_tuple: # edges self.ent_ids = [(int(e[0]), int(e[1])) for e in self.ent_ids] for e in self.ent_ids: assert (-1 < e[0] < size) assert (-1 < e[1] < size) else: self.ent_ids = [int(i) for i in self.ent_ids] for i in self.ent_ids: assert (-1 < i < size)
@attr.define class _ExtrusionHelper: """Helper class for extruding 2D --> 3D""" ncells_tall: int ncells_2D: list[int] def col_to_id(self, column, z_cell): """Maps 2D cell ID and index in the vertical to a 3D cell ID""" return z_cell + column * self.ncells_tall def node_to_id(self, node, z_node): """Maps 2D node ID and index in the vertical to a 3D node ID""" return z_node + node * (self.ncells_tall + 1) def edge_to_id(self, edge, z_cell): """Maps 2D edge hash and index in the vertical to a 3D face ID of a vertical face""" return (self.ncells_tall + 1) * self.ncells_2D + z_cell + edge * self.ncells_tall
[docs] @attr.define(slots=False) class Mesh2D: """A 2D mesh class. Parameters ---------- coords : np.ndarray(NCOORDS,NDIMS) Array of coordinates of the 2D mesh. conn : list(lists) List of lists of indices into coords that form the cells. labeled_sets : list(LabeledSet), optional List of labeled sets to add to the mesh. crs : CRS Keep this as a property for future reference. eps : float, optional=0.01 A small measure of length between coords. check_handedness : bool, optional=True If true, ensure all cells are oriented so that right-hand rule ordering of the nodes points up. validate : bool, optional=False If true, validate coordinates and connections post-construction. Note: (coords, conn) may be output provided by a watershed_workflow.triangulation.triangulate() call. """ coords = attr.ib(validator=attr.validators.instance_of(np.ndarray)) _conn = attr.ib() labeled_sets = attr.ib(factory=list) crs = attr.ib(default=None) eps = attr.ib(default=0.001) _check_handedness = attr.ib(default=True) _validate = attr.ib(default=False) def __attrs_post_init__(self): if self._check_handedness: self.check_handedness() if self._validate: self.validate() del self._validate @property def conn(self): """Note that conn is immutable because changing this breaks all the other properties, which may be cached. To change topology, one must construct a new mesh!""" return self._conn @property def dim(self): return self.coords.shape[1] @property def num_cells(self): return len(self.conn) @property def num_nodes(self): return self.coords.shape[0] @property def num_edges(self): return len(self.edges) @property def edges(self): return self.edges_to_cells.keys()
[docs] @staticmethod def edge_hash(i, j): """Hashes edges in a direction-independent way.""" return tuple(sorted((i, j)))
@property @cache def edge_counts(self): items = self.edges_to_cells.items() return dict((e, len(v)) for (e, v) in items) @property @cache def edges_to_cells(self): e2c = collections.defaultdict(list) for i, c in enumerate(self.conn): for e in self.cell_edges(c): e2c[e].append(i) return e2c def cell_edges(self, conn): for i in range(len(conn)): yield self.edge_hash(conn[i], conn[(i+1) % len(conn)]) @property @cache def boundary_edges(self): """Return edges in the boundary of the mesh, ordered around the boundary.""" be = sorted([k for (k, count) in self.edge_counts.items() if count == 1], key=lambda a: a[0]) seed = be.pop(0) be_ordered = [seed, ] done = False while not done: try: new_e = next(e for e in be if e[0] == be_ordered[-1][1]) be.remove(new_e) except StopIteration: new_e = next(e for e in be if e[1] == be_ordered[-1][1]) be.remove(new_e) new_e = tuple(reversed(new_e)) be_ordered.append(new_e) done = len(be) == 0 assert (be_ordered[-1][-1] == be_ordered[0][0]) return be_ordered @property @cache def boundary_nodes(self): return [e[0] for e in self.boundary_edges]
[docs] def check_handedness(self): """Ensures all cells are oriented via the right-hand-rule, i.e. in the +z direction.""" for conn in self._conn: points = np.array([self.coords[c] for c in conn]) cross = 0 for i in range(len(points)): im = i - 1 ip = i + 1 if ip == len(points): ip = 0 p = points[ip] - points[i] m = points[i] - points[im] cross = cross + p[1] * m[0] - p[0] * m[1] if cross < 0: conn.reverse()
def validate(self): # validate coords assert (isinstance(self.coords, np.ndarray)) assert (len(self.coords.shape) == 2) assert (self.coords.shape[1] in [2, 3]) # validate conn assert (min(c for conn in self.conn for c in conn) >= 0) assert (max(c for conn in self.conn for c in conn) < self.coords.shape[0]) # validate labeled_sets assert (len(set(ls.setid for ls in self.labeled_sets)) == len(self.labeled_sets)) for ls in self.labeled_sets: is_tuple = False if ls.entity == 'CELL': size = self.num_cells elif ls.entity == 'FACE': size = self.num_nodes # note there are no faces, edges are tuples of nodes is_tuple = True elif ls.entity == 'NODE': size = self.num_nodes ls.validate(size, is_tuple)
[docs] def next_available_labeled_setid(self): """Returns next available LS id.""" i = 10000 while any(i == ls.setid for ls in self.labeled_sets): i += 1 return int(i)
[docs] def compute_centroid(self, c): """Computes, based on coords, the centroid of a cell with ID c. Note this ALWAYS recomputes the value, not using the cache. """ points = np.array([self.coords[i] for i in self.conn[c]]) return points.mean(axis=0)
@property @cache def centroids(self): """Calculate surface mesh centroids.""" return np.array([self.compute_centroid(c) for c in range(self.num_cells)])
[docs] def clear_geometry_cache(self): """If coordinates are changed, any computed, cached geometry must be recomputed. It is the USER's responsibility to call this function if any coords are changed! """ # toss geometry cache if hasattr(self, '_centroids'): del self._centroids
[docs] def plot(self, color=None, ax=None): """Plot the flattened 2D mesh.""" if color is None: cm = watershed_workflow.colors.cm_mapper(0, self.num_cells - 1) colors = [cm(i) for i in range(self.num_cells)] else: colors = color verts = [[self.coords[i, 0:2] for i in f] for f in self.conn] from matplotlib import collections gons = collections.PolyCollection(verts, facecolors=colors) from matplotlib import pyplot as plt if ax is None: fig, ax = plt.subplots(1, 1) ax.add_collection(gons) ax.autoscale_view()
[docs] def write_vtk(self, filename): """Writes to VTK.""" assert (all(len(c) == 3 for c in self.conn)) watershed_workflow.vtk_io.write(filename, self.coords, { 'triangle': np.array(self.conn) })
[docs] def transform(self, mat=None, shift=None): """Transform a 2D mesh""" if mat is None: mat = np.array([[1, 0, 0], [0, 1, 0], [0, 0, 1]]) if shift is None: shift = np.array([0, 0, 0]) new_coords = [] for c in self.coords: assert (c.shape == (3, )) tc = mat@c + shift assert (tc.shape == (3, )) new_coords.append(tc) new_coords = np.array(new_coords) assert (new_coords.shape == self.coords.shape) self.coords = new_coords # toss geometry cache self.clear_geometry_cache()
[docs] @classmethod def read_VTK(cls, filename): """Constructor from a VTK file.""" try: return cls.read_VTK_Simplices(filename) except AssertionError: return cls.read_VTK_Unstructured(filename)
[docs] @classmethod def read_VTK_Unstructured(cls, filename): """Constructor from an unstructured VTK file.""" with open(filename, 'rb') as fid: points_found = False polygons_found = False while True: line = fid.readline().decode('utf-8') if not line: # EOF break line = line.strip() if len(line) == 0: continue split = line.split() section = split[0] if section == 'POINTS': ncoords = int(split[1]) points = np.fromfile(fid, count=ncoords * 3, sep=' ', dtype='d') points = points.reshape(ncoords, 3) points_found = True elif section == 'POLYGONS': ncells = int(split[1]) n_to_read = int(split[2]) gons = [] data = np.fromfile(fid, count=n_to_read, sep=' ', dtype='i') idx = 0 for i in range(ncells): n_in_gon = data[idx] gon = list(data[idx + 1:idx + 1 + n_in_gon]) # check handedness -- need normals to point up! cross = [] for i in range(len(gon)): if i == len(gon) - 1: ip = 0 ipp = 1 elif i == len(gon) - 2: ip = i + 1 ipp = 0 else: ip = i + 1 ipp = i + 2 d2 = points[gon[ipp]] - points[gon[ip]] d1 = points[gon[i]] - points[gon[ip]] cross.append(np.cross(d2, d1)) if (np.array([c[2] for c in cross]).mean() < 0): gon.reverse() gons.append(gon) idx += n_in_gon + 1 assert (idx == n_to_read) polygons_found = True if not points_found: raise RuntimeError("Unstructured VTK must contain sections 'POINTS'") if not polygons_found: raise RuntimeError("Unstructured VTK must contain sections 'POLYGONS'") return cls(points, gons)
[docs] @classmethod def read_VTK_Simplices(cls, filename): """Constructor from an structured VTK file. """ with open(filename, 'rb') as fid: data = watershed_workflow.vtk_io.read_buffer(fid) points = data[0] if len(data[1]) != 1: raise RuntimeError( "Simplex VTK file is readable by vtk_io but not by meshing_ats. Includes: %r" % data[1].keys()) gons = next(v for v in data[1].values()) gons = gons.tolist() # check handedness for gon in gons: cross = [] for i in range(len(gon)): if i == len(gon) - 1: ip = 0 ipp = 1 elif i == len(gon) - 2: ip = i + 1 ipp = 0 else: ip = i + 1 ipp = i + 2 d2 = points[gon[ipp]] - points[gon[ip]] d1 = points[gon[i]] - points[gon[ip]] cross.append(np.cross(d2, d1)) if (np.array([c[2] for c in cross]).mean() < 0): gon.reverse() return cls(points, gons)
[docs] @classmethod def from_Transect(cls, x, z, width=1, **kwargs): """Creates a 2D surface strip mesh from transect data""" # coordinates if (type(width) is list or type(width) is np.ndarray): variable_width = True y = np.array([0, 1]) else: variable_width = False y = np.array([-width / 2, width / 2]) Xc, Yc = np.meshgrid(x, y) if variable_width: assert (Yc.shape[0] == 2) assert (len(width) == Yc.shape[1]) assert (min(width) > 0.) Yc[0, :] = -width / 2. Yc[1, :] = width / 2. Xc = Xc.flatten() Yc = Yc.flatten() Zc = np.concatenate([z, z]) # connectivity nsurf_cells = len(x) - 1 conn = [] for i in range(nsurf_cells): conn.append([i, i + 1, nsurf_cells + i + 2, nsurf_cells + i + 1]) points = np.array([Xc, Yc, Zc]) return cls(points.transpose(), conn, **kwargs)
[docs] def to_dual(self): """Creates a 2D surface mesh from a primal 2D triangular surface mesh. Returns ------- dual_nodes : np.ndarray dual_conn : list(lists) The nodes and cell_to_node_conn of a 2D, polygonal, nearly Voronoi mesh that is the truncated dual of self. Here we say nearly because the boundary triangles (see note below) are not Voronoi. dual_from_primal_mapping : np.array( (len(dual_conn),), 'i') Mapping from a dual cell to the primal node it is based on. Without the boundary, this would be simply arange(0,len(dual_conn)), but beacuse we added triangles on the boundary, this mapping is useful. Note ---- - Nodes of the dual are on circumcenters of the primal triangles. - Centers of the dual are numbered by nodes of the primal mesh, modulo the boundary. - At the boundary, the truncated dual polygon may be non-convex. To avoid this issue, we triangulate boundary polygons. We do not simply construct a mesh because likely the user needs to elevate, generate material IDs, labeled sets, etc, prior to mesh construction. """ logging.info("Constructing Mesh2D as dual of a triangulation.") logging.info("-- confirming triangulation (note, not checking delaunay, buyer beware)") for c in self.conn: assert (len(c) == 3) # check all triangles def circumcenter(p1, p2, p3): d = 2 * (p1[0] * (p2[1] - p3[1]) + p2[0] * (p3[1] - p1[1]) + p3[0] * (p1[1] - p2[1])) xv = ((p1[0]**2 + p1[1]**2) * (p2[1] - p3[1]) + (p2[0]**2 + p2[1]**2) * (p3[1] - p1[1]) + (p3[0]**2 + p3[1]**2) * (p1[1] - p2[1])) / d yv = ((p1[0]**2 + p1[1]**2) * (p3[0] - p2[0]) + (p2[0]**2 + p2[1]**2) * (p1[0] - p3[0]) + (p3[0]**2 + p3[1]**2) * (p2[0] - p1[0])) / d return (xv, yv) # dual nodes are given by: # - primal cell circumcenters # - primal nodes on the boundary # - primal edge midpoints on the boundary (note this length is the same as primal nodes on the boundary) # dual cells are given by: # - primal cell nodes (modulo the boundary) coords = self.coords[:, 0:2] logging.info("-- computing primary boundary edges") boundary_edges = self.boundary_edges n_dual_nodes = len(self.conn) + 2 * len(boundary_edges) logging.info(" n_primal_cell = {}, n_boundary_edges = {}, n_dual_nodes = " "{}".format(len(self.conn), len(boundary_edges), n_dual_nodes)) # create space to populate dual coordinates dual_nodes = np.zeros((n_dual_nodes, 2), 'd') dual_from_primal_mapping = list(range(len(coords))) # Create a list of lists for dual cells -- at this point the truncated # dual (e.g. allow non-convex boundary polygons), so one per primal # node. Also create a flag array for whether a given dual cell is on the # boundary and so might be non-convex. # # Note that both of these will be indexed by the index of the # corresponding primal node. dual_cells = [list() for i in range(len(coords))] is_boundary = np.zeros(len(dual_cells), 'i') # Loop over all primal cells (triangles), adding the circumcenter # as a dual node and sticking that node in three dual cells rooted # at the three primal nodes. logging.info("-- computing dual nodes") i_dual_node = 0 for j, c in enumerate(self.conn): dual_nodes[i_dual_node][:] = circumcenter(coords[c[0]], coords[c[1]], coords[c[2]]) dual_cells[c[0]].append(i_dual_node) dual_cells[c[1]].append(i_dual_node) dual_cells[c[2]].append(i_dual_node) i_dual_node += 1 logging.info(" added {} tri centroid nodes".format(i_dual_node)) # Loop over the boundary, adding both the primal nodes and the edge # midpoints as dual nodes. # # Add the primal node and two midpoints on either side to the list # of dual nodes in the cell "rooted at" the primal node. for i, e in enumerate(boundary_edges): # add the primal node always my_primal_node_dual_node = i_dual_node dual_nodes[i_dual_node][:] = coords[e[0]] i_dual_node += 1 my_cell = list() # stick in the previous midpoint node, add to my_cell if i == 0: # reserve a spot for the last midpoint first_cell_added = e[0] my_cell.append(-1) else: my_cell.append(prev_midp_n) # stick in the next midpoint node, add to my_cell next_midp_n = i_dual_node next_midp = (coords[e[0]][:] + coords[e[1]][:]) / 2. dual_nodes[i_dual_node][:] = next_midp i_dual_node += 1 my_cell.append(next_midp_n) # add the primal node to my_cell my_cell.append(my_primal_node_dual_node) dual_cells[e[0]].extend(my_cell) is_boundary[e[0]] = 1 prev_midp_n = next_midp_n # patch up the first cell added assert (dual_cells[first_cell_added][-3] == -1) dual_cells[first_cell_added][-3] = prev_midp_n logging.info(" added {} boundary nodes".format(len(boundary_edges))) # # Now every dual cell has a list of nodes (in no particular order). # But some of these nodes may be duplicated -- either the circumcenter # of two adjacent triangles may both be on the boundary, allowing for # coincident points at the midpoint of the coincident faces, or (more # likely) there was a circumcenter on the boundary, meaning it is # coincident with the boundary edge's midpoint. # # Order those nodes, and collect a list of coincident nodes for removal. logging.info("-- Finding duplicates and ordering conn_cell_to_node") nodes_to_kill = dict() # coincident nodes (key = node to remove, val = coincident node) for i in range(len(dual_cells)): c = dual_cells[i] if is_boundary[i]: # check for duplicate nodes to_pop = [] for k in range(len(c)): for j in range(k + 1, len(c)): if (np.linalg.norm(dual_nodes[c[k]] - dual_nodes[c[j]]) < self.eps): logging.info(" found dup on boundary cell {} = {}".format(i, c)) logging.info(" indices = {}, {}".format(k, j)) logging.info(" nodes = {}, {}".format(c[k], c[j])) logging.info(" coords = {}, {}".format( dual_nodes[c[k]], dual_nodes[c[j]])) if c[k] in nodes_to_kill: assert c[j] == nodes_to_kill[c[k]] assert k < len(c) - 3 to_pop.append(k) elif c[j] in nodes_to_kill: assert c[k] == nodes_to_kill[c[j]] assert j < len(c) - 3 to_pop.append(j) else: assert k < len(c) - 3 nodes_to_kill[c[k]] = c[j] to_pop.append(k) # remove the duplicated nodes from the cell_to_node_conn for j in reversed(sorted(to_pop)): c.pop(j) # may not be convex -- triangulate c_orig = c[:] c0 = c[-1] # the primal node cup = c[-2] # boundary midpoint, one direction cdn = c[-3] # boundary midpoint, the other direction # order the nodes (minus the primal node) clockwise around the primal node cell_coords = np.array([dual_nodes[cj] for cj in c[:-1]]) - dual_nodes[c0] angle = np.array([ np.arctan2(cell_coords[j, 1], cell_coords[j, 0]) for j in range(len(cell_coords)) ]) order = np.argsort(angle) c = [c[j] for j in order] # now find what is "inside" and what is "outside" the domain by # finding up and dn, making one the 0th point and the other the # last point up_i = c.index(cup) dn_i = c.index(cdn) if dn_i == (up_i+1) % len(c): cn = c[dn_i:] + c[0:dn_i] elif up_i == (dn_i+1) % len(c): cn = c[up_i:] + c[0:up_i] else: # I screwed up... debug me! print("Uh oh bad geom: up_i = {}, dn_i = {}, c = {}".format(up_i, dn_i, c)) fig = plt.figure() ax = fig.add_subplot(111) cc_sorted = np.array([cell_coords[k] for k in order]) + dual_nodes[c0] cb_sorted = np.array([dual_nodes[cdn], dual_nodes[c], dual_nodes[cup]]) ax.plot(cc_sorted[:, 0], cc_sorted[:, 1], 'k-x') ax.scatter(dual_nodes[c0, 0], dual_nodes[c0, 1], color='r') ax.scatter(dual_nodes[cup, 0], dual_nodes[cup, 1], color='m') ax.scatter(dual_nodes[cdn, 0], dual_nodes[cdn, 1], color='b') plt.show() fig = plt.figure(figsize=figsize) ax = watershed_workflow.plot.get_ax(crs, fig) mp = watershed_workflow.plot.triangulation(mesh_points3, mesh_tris, crs, ax=ax, color='elevation', edgecolor='white', linewidth=0.4) cbar = fig.colorbar(mp, orientation="horizontal", pad=0.05) #watershed_workflow.plot.hucs(shapes, crs, ax=ax, color='k', linewidth=1) watershed_workflow.plot.shply([shapely.geometry.LineString(cc_sorted), ], crs, ax=ax, color='red', linewidth=1) watershed_workflow.plot.shply([shapely.geometry.LineString(cb_sorted), ], crs, ax=ax, color='blue', linewidth=1) ax.set_aspect('equal', 'datalim') raise RuntimeError('uh oh borked geom') # triangulate the truncated dual polygon, always including the # primal node to guarantee all triangles exist and partition # the polygon. for k in range(len(cn) - 1): if k == 0: dual_cells[i] = [c0, cn[k + 1], cn[k]] else: dual_cells.append([c0, cn[k + 1], cn[k]]) dual_from_primal_mapping.append(i) else: # NOT a boundary polygon. Simply order and check for duplicate nodes. to_pop = [] for k in range(len(c)): for j in range(k + 1, len(c)): if (np.linalg.norm(dual_nodes[c[k]] - dual_nodes[c[j]]) < self.eps): logging.info(" found dup on interior cell {} = {}".format(i, c)) logging.info(" indices = {}, {}".format(k, j)) logging.info(" nodes = {}, {}".format(c[k], c[j])) logging.info(" coords = {}, {}".format( dual_nodes[c[k]], dual_nodes[c[j]])) if c[k] in nodes_to_kill: assert c[j] == nodes_to_kill[c[k]] to_pop.append(k) elif c[j] in nodes_to_kill: assert c[k] == nodes_to_kill[c[j]] to_pop.append(j) else: nodes_to_kill[c[k]] = c[j] to_pop.append(k) # remove the duplicated nodes from the cell_to_node_conn for j in reversed(sorted(to_pop)): c.pop(j) # order around the primal node (now dual cell centroid) cell_coords = np.array([dual_nodes[j] for j in c]) - coords[i] angle = np.array([ np.arctan2(cell_coords[j, 1], cell_coords[j, 0]) for j in range(len(cell_coords)) ]) order = np.argsort(angle) dual_cells[i] = [c[j] for j in order] logging.info("-- removing duplicate nodes") # note this requires both removing the duplicates from the coordinate # list, but also remapping to new numbers that range in [0, # num_not_removed_nodes). To do the latter, we create a map from old # indices to new indices, where removed indices are mapped to their # coincident, new index. These old nodes may have appeared in cells # that did not actually include its duplicate, so must get remapped to # the coincident node. i = 0 compression_map = -np.ones(len(dual_nodes), 'i') for j in range(len(dual_nodes)): if j not in nodes_to_kill: compression_map[j] = i i += 1 for j in nodes_to_kill.keys(): compression_map[j] = compression_map[nodes_to_kill[j]] assert (compression_map.min() >= 0) # -- delete the nodes to_kill = sorted(list(nodes_to_kill.keys())) dual_nodes = np.delete(dual_nodes, to_kill, axis=0) # -- remap the conn for c in dual_cells: for j in range(len(c)): c[j] = compression_map[c[j]] assert (min(c) >= 0) return dual_nodes, dual_cells, np.array(dual_from_primal_mapping)
[docs] @attr.define(slots=False) class Mesh3D: """A 3D mesh class. Parameters ---------- coords : np.ndarray(NCOORDS,3) Array of coordinates of the 3D mesh. face_to_node_conn : list(lists) List of lists of indices into coords that form the faces. cell_to_face_conn : list(lists) List of lists of indices into face_to_node_conn that form the cells. side_sets : list(SideSet), optional List of side sets to add to the mesh. labeled_sets : list(LabeledSet), optional List of labeled sets to add to the mesh. material_ids : np.array((len(cell_to_face_conn),),'i'), optional Array of length num_cells that specifies material IDs crs : CRS Keep the coordinate system for reference. eps : float, optional=0.01 A small measure of length between coords. Note that (coords, conn) may be output provided by a watershed_workflow.triangulation.triangulate() call. """ coords = attr.ib(validator=attr.validators.instance_of(np.ndarray)) _face_to_node_conn = attr.ib() _cell_to_face_conn = attr.ib() labeled_sets = attr.ib(factory=list) side_sets = attr.ib(factory=list) material_ids = attr.ib(default=None) crs = attr.ib(default=None) eps = attr.ib(default=0.001) _validate = attr.ib(default=False) def __attrs_post_init__(self): if self._validate: self.validate() del self._validate @property def face_to_node_conn(self): return self._face_to_node_conn @property def cell_to_face_conn(self): return self._cell_to_face_conn @property def material_ids_list(self): return sorted(list(set(self.material_ids))) @property def dim(self): return self.coords.shape[1] @property def num_cells(self): return len(self.cell_to_face_conn) @property def num_nodes(self): return self.coords.shape[0] @property def num_faces(self): return len(self.face_to_node_conn)
[docs] def validate(self): """Checks the validity of the mesh, or throws an AssertionError.""" # validate coords assert (isinstance(self.coords, np.ndarray)) assert (len(self.coords.shape) == 2) assert (self.coords.shape[1] == 3) # validate conn assert (min(c for conn in self.face_to_node_conn for c in conn) >= 0) assert (max(c for conn in self.face_to_node_conn for c in conn) < len(self.coords)) assert (min(c for conn in self.cell_to_face_conn for c in conn) >= 0) assert (max(c for conn in self.cell_to_face_conn for c in conn) < len(self.face_to_node_conn)) # validate labeled_sets and side sets ls_ss_ids = [ls.setid for ls in self.labeled_sets] + \ [ss.setid for ss in self.side_sets] assert (len(set(ls_ss_ids)) == len(ls_ss_ids)) for ls in self.labeled_sets: if ls.entity == 'CELL': size = self.num_cells elif ls.entity == 'NODE': size = self.num_nodes else: assert (False) # no face or edge sets ls.validate(size, False) for ss in self.side_sets: ss.validate(self.cell_to_face_conn) # validate material ids assert (self.material_ids is not None) assert (len(self.material_ids) == len(self.cell_to_face_conn))
[docs] def next_available_labeled_setid(self): """Returns next available LS id.""" i = 10000 while any(i == ls.setid for ls in self.labeled_sets) or \ any(i == ls.setid for ls in self.side_sets): i += 1 return i
[docs] def write_vtk(self, filename): """Writes to VTK. Note, this just writes the topology/geometry information, for WEDGE type meshes (extruded triangles). No labeled sets are written. Prefer to use write_exodus() for a fully featured mesh. """ assert (all(len(c) == 5 for c in self.cell_to_face_conn)) wedges = [] for c2f in self.cell_to_face_conn: fup = c2f[0] fdn = c2f[1] assert (len(self.face_to_node_conn[fup]) == 3 and len(self.face_to_node_conn[fdn]) == 3) wedges.append(self.face_to_node_conn[fup] + self.face_to_node_conn[fdn]) watershed_workflow.vtk_io.write(filename, self.coords, { 'wedge': np.array(wedges) })
[docs] def write_exodus(self, filename, face_block_mode="one block"): """Write the 3D mesh to ExodusII using arbitrary polyhedra spec""" if exodus is None: raise ImportError( "The python ExodusII wrappers were not found, please see the installation documentation to install Exodus" ) # Note exodus uses the term element instead of cell, so we # swap to that term in this method. # put elems in with blocks, which renumbers the elems, so we # have to track sidesets. Therefore we keep a map of old elem # to new elem ordering # # also, though not required by the spec, paraview and visit # seem to crash if num_face_blocks != num_elem_blocks. So # make face blocks here too, which requires renumbering the faces. # -- first pass, form all element blocks and make the map from old-to-new new_to_old_elems = [] elem_blks = [] for i_m, m_id in enumerate(self.material_ids_list): # split out elems of this material, save new_to_old map elems_tuple = [(i, c) for (i, c) in enumerate(self.cell_to_face_conn) if self.material_ids[i] == m_id] new_to_old_elems.extend([i for (i, c) in elems_tuple]) elems = [c for (i, c) in elems_tuple] elem_blks.append(elems) old_to_new_elems = sorted([(old, i) for (i, old) in enumerate(new_to_old_elems)], key=lambda a: a[0]) # -- deal with faces, form all face blocks and make the map from old-to-new face_blks = [] if face_block_mode == "one block": # no reordering of faces needed face_blks.append(self.face_to_node_conn) elif face_block_mode == "n blocks, not duplicated": used_faces = np.zeros((len(self.face_to_node_conn), ), 'bool') new_to_old_faces = [] for i_m, m_id in enumerate(self.material_ids_list): # split out faces of this material, save new_to_old map def used(f): result = used_faces[f] used_faces[f] = True return result elem_blk = elem_blks[i_m] faces_tuple = [(f, self.face_to_node_conn[f]) for c in elem_blk for (j, f) in enumerate(c) if not used(f)] new_to_old_faces.extend([j for (j, f) in faces_tuple]) faces = [f for (j, f) in faces_tuple] face_blks.append(faces) # get the renumbering in the elems old_to_new_faces = sorted([(old, j) for (j, old) in enumerate(new_to_old_faces)], key=lambda a: a[0]) elem_blks = [[[old_to_new_faces[f][1] for f in c] for c in elem_blk] for elem_blk in elem_blks] elif face_block_mode == "n blocks, duplicated": elem_blks_new = [] offset = 0 for i_m, m_id in enumerate(self.material_ids_list): used_faces = np.zeros((len(self.face_to_node_conn), ), 'bool') def used(f): result = used_faces[f] used_faces[f] = True return result elem_blk = elem_blks[i_m] tuple_old_f = [(f, self.face_to_node_conn[f]) for c in elem_blk for f in c if not used(f)] tuple_new_old_f = [(new, old, f) for (new, (old, f)) in enumerate(tuple_old_f)] old_to_new_blk = np.zeros((len(self.face_to_node_conn), ), 'i') - 1 for new, old, f in tuple_new_old_f: old_to_new_blk[old] = new + offset elem_blk_new = [[old_to_new_blk[f] for f in c] for c in elem_blk] #offset = offset + len(ftuple_new) elem_blks_new.append(elem_blk_new) face_blks.append([f for i, j, f in tuple_new_old_f]) elem_blks = elem_blks_new elif face_block_mode == "one block, repeated": # no reordering of faces needed, just repeat for eblock in elem_blks: face_blks.append(self.face_to_node_conn) else: raise RuntimeError( "Invalid face_block_mode: '%s', valid='one block', 'n blocks, duplicated', 'n blocks, not duplicated'" % face_block_mode) # open the mesh file num_elems = sum(len(elem_blk) for elem_blk in elem_blks) num_faces = sum(len(face_blk) for face_blk in face_blks) filename_base = os.path.split(filename)[-1] ep = exodus.ex_init_params(title=filename_base.encode('ascii'), num_dim=3, num_nodes=self.num_nodes, num_face=num_faces, num_face_blk=len(face_blks), num_elem=num_elems, num_elem_blk=len(elem_blks), num_side_sets=len(self.side_sets), num_elem_sets=sum(1 for ls in self.labeled_sets if ls.entity == 'CELL'), ) e = exodus.exodus(filename, mode='w', array_type='numpy', init_params=ep) # put the coordinates e.put_coord_names(['coordX', 'coordY', 'coordZ']) e.put_coords(self.coords[:, 0], self.coords[:, 1], self.coords[:, 2]) # put the face blocks for i_blk, face_blk in enumerate(face_blks): face_raveled = [n for f in face_blk for n in f] e.put_polyhedra_face_blk(i_blk + 1, len(face_blk), len(face_raveled), 0) e.put_node_count_per_face(i_blk + 1, np.array([len(f) for f in face_blk])) e.put_face_node_conn(i_blk + 1, np.array(face_raveled) + 1) # put the elem blocks assert len(elem_blks) == len(self.material_ids_list) for i_blk, (m_id, elem_blk) in enumerate(zip(self.material_ids_list, elem_blks)): elems_raveled = [f for c in elem_blk for f in c] e.put_polyhedra_elem_blk(m_id, len(elem_blk), len(elems_raveled), 0) e.put_elem_blk_name(m_id, 'MATERIAL_ID_%d' % m_id) e.put_face_count_per_polyhedra(m_id, np.array([len(c) for c in elem_blk])) e.put_elem_face_conn(m_id, np.array(elems_raveled) + 1) # add sidesets for ss in self.side_sets: logging.info(f'adding side set: {ss.setid}') for elem in ss.cell_list: assert old_to_new_elems[elem][0] == elem new_elem_list = sorted([(old_to_new_elems[elem][1], side) for (elem, side) in zip(ss.cell_list, ss.side_list)]) e.put_side_set_params(ss.setid, len(ss.cell_list), 0) e.put_side_set_name(ss.setid, ss.name) e.put_side_set(ss.setid, np.array([e_id for (e_id, side) in new_elem_list]) + 1, np.array([side for (e_id, side) in new_elem_list]) + 1) # add labeled sets for ls in self.labeled_sets: if ls.entity == 'CELL': if hasattr(e, 'put_elem_set_params'): logging.info(f'adding elem set: {ls.setid}') new_elem_list = sorted([old_to_new_elems[elem][1] for elem in ls.ent_ids]) e.put_elem_set_params(ls.setid, len(new_elem_list), None) e.put_elem_set_name(ls.setid, ls.name) e.put_elem_set(ls.setid, np.array(new_elem_list) + 1) else: logging.warning( f'not writing elem_set: {ls.setid} because this exodus installation does not write element sets' ) else: warnings.warning(f'Cannot write labeled set of type {ls.entity}') # finish and close e.close()
[docs] @staticmethod def summarize_extrusion(layer_types, layer_data, ncells_per_layer, mat_ids, surface_cell_id=0): """ Summarizes extruded data by printing info to log file. This is useful in rapidly debugging and understanding the layering before you do the extrusion process. """ count = 0 logging.info("Cell summary:") logging.info("-" * 60) logging.info("l_id\t| c_id\t|mat_id\t| dz\t\t| z_top") logging.info("-" * 60) rep_z = 0. for i, thick in enumerate(layer_data): for j in range(ncells_per_layer[i]): try: mat_id = mat_ids[i][surface_cell_id] except TypeError: mat_id = mat_ids[i] logging.info(" %02i \t| %02i \t| %4i \t| %10.6f \t| %10.6f" % (i, count, mat_id, thick / ncells_per_layer[i], rep_z)) count += 1 rep_z += thick / ncells_per_layer[i]
[docs] @classmethod def extruded_Mesh2D(cls, mesh2D, layer_types, layer_data, ncells_per_layer, mat_ids): """Uniformly extrude a 2D mesh to make a 3D mesh. Layers of potentially multiple sets of cells are extruded downward in the vertical. The cell dz is uniform horizontally and vertically through the layer in all but the 'snapped' case, where it is uniform vertically but not horizontally. Parameters ---------- mesh2D : Mesh2D The 2D mesh to extrude layer_types : str, list[str] One of ['snapped', 'function', 'node', 'cell',] or a list of these strings, describing the extrusion method for each layer. If only a single string is supplied, all layers are of this type. See layer_data below. layer_data : list[data] Data required, one for each layer. The data is specific to the layer type: - 'constant' : float, layer thickness - 'snapped' : float, the bottom z-coordinate of the layer. Cells are extruded uniformly to whatever thickness required to match this bottom coordinate. - 'function' : function, layer_thickness = func(x,y) - 'node' : np.array((mesh2D.num_nodes,), float) layer thickness for each node - 'cell' : np.array((mesh2D.num_cells,), float) interpolates nodal layer thickness from neighboring cell thicknesses ncells_per_layer : int, list[int] Either a single integer (same number of cells in all layers) or a list of number of cells in each layer mat_ids : int, list[int], np.ndarray((num_layers, mesh2D.num_cells), dtype=int) Material ID for each cell in each layer. If an int, all cells in all layers share thesame material ID. If a list or 1D array, one material ID per layer. If a 2D array, provides all material IDs explicitly. Returns ------- Mesh3D The extruded, 3D mesh. """ # make the data all lists # --------------------------------- def is_list(data): if type(data) is str: return False try: iter(data) except TypeError: return False else: return True if is_list(layer_types): if not is_list(layer_data): layer_data = [layer_data, ] * len(layer_types) else: assert len(layer_data) == len(layer_types) if not is_list(ncells_per_layer): ncells_per_layer = [ncells_per_layer, ] * len(layer_types) else: assert len(ncells_per_layer) == len(layer_types) elif is_list(layer_data): layer_types = [layer_types, ] * len(layer_data) if not is_list(ncells_per_layer): ncells_per_layer = [ncells_per_layer, ] * len(layer_data) else: assert len(ncells_per_layer) == len(layer_data) elif is_list(ncells_per_layer): layer_type = [layer_type, ] * len(ncells_per_layer) layer_data = [layer_data, ] * len(ncells_per_layer) else: layer_type = [layer_type, ] layer_data = [layer_data, ] ncells_per_layer = [ncells_per_layer, ] # helper data and functions for mapping indices from 2D to 3D # ------------------------------------------------------------------ if min(ncells_per_layer) < 0: raise RuntimeError("Invalid number of cells, negative value provided.") ncells_tall = sum(ncells_per_layer) ncells_total = ncells_tall * mesh2D.num_cells nfaces_total = (ncells_tall+1) * mesh2D.num_cells + ncells_tall * mesh2D.num_edges nnodes_total = (ncells_tall+1) * mesh2D.num_nodes eh = _ExtrusionHelper(ncells_tall, mesh2D.num_cells) np_mat_ids = np.array(mat_ids, dtype=int) if np_mat_ids.size == np.size(np_mat_ids, 0): if np_mat_ids.size == 1: np_mat_ids = np.full((len(ncells_per_layer), mesh2D.num_cells), mat_ids[0], dtype=int) else: np_mat_ids = np.empty((len(ncells_per_layer), mesh2D.num_cells), dtype=int) for ilay in range(len(ncells_per_layer)): np_mat_ids[ilay, :] = np.full(mesh2D.num_cells, mat_ids[ilay], dtype=int) # create coordinates # --------------------------------- coords = np.zeros((mesh2D.coords.shape[0], ncells_tall + 1, 3), 'd') coords[:, :, 0:2] = np.expand_dims(mesh2D.coords[:, 0:2], 1) if mesh2D.dim == 3: coords[:, 0, 2] = mesh2D.coords[:, 2] # else the surface is at 0 depth cell_layer_start = 0 for layer_type, layer_datum, ncells in zip(layer_types, layer_data, ncells_per_layer): if layer_type.lower() == 'constant': dz = float(layer_datum) / ncells for i in range(1, ncells + 1): coords[:, cell_layer_start + i, 2] = coords[:, cell_layer_start, 2] - i*dz else: # allocate an array of coordinates for the bottom of the layer layer_bottom = np.zeros((mesh2D.coords.shape[0], ), 'd') if layer_type.lower() == 'snapped': # layer bottom is uniform layer_bottom[:] = layer_datum elif layer_type.lower() == 'function': # layer thickness is given by a function evaluation of x,y for node_col in range(mesh2D.coords.shape[0]): layer_bottom[node_col] = coords[node_col, cell_layer_start, 2] - layer_datum( coords[node_col, 0, 0], coords[node_col, 0, 1]) elif layer_type.lower() == 'node': # layer bottom specifically provided through thickness layer_bottom[:] = coords[:, cell_layer_start, 2] - layer_datum elif layer_type.lower() == 'cell': # interpolate cell thicknesses to node thicknesses import scipy.interpolate centroids = mesh2D.centroids interp = scipy.interpolate.interp2d(centroids[:, 0], centroids[:, 1], layer_datum, kind='linear') layer_bottom[:] = coords[:, cell_layer_start, 2] - interp( mesh2D.coords[:, 0], mesh2D.coords[:, 1]) else: raise RuntimeError("Unrecognized layer_type '%s'" % layer_type) # linspace from bottom of previous layer to bottom of this layer for node_col in range(mesh2D.coords.shape[0]): coords[node_col, cell_layer_start:cell_layer_start + ncells + 1, 2] = np.linspace(coords[node_col, cell_layer_start, 2], layer_bottom[node_col], ncells + 1) cell_layer_start = cell_layer_start + ncells # create faces, face sets, cells bottom = [] surface = [] faces = [] cells = [list() for c in range(ncells_total)] # -- loop over the columns, adding the horizontal faces for col in range(mesh2D.num_cells): nodes_2 = mesh2D.conn[col] surface.append(eh.col_to_id(col, 0)) for z_face in range(ncells_tall + 1): i_f = len(faces) f = [eh.node_to_id(n, z_face) for n in nodes_2] if z_face != ncells_tall: cells[eh.col_to_id(col, z_face)].append(i_f) if z_face != 0: cells[eh.col_to_id(col, z_face - 1)].append(i_f) faces.append(f) bottom.append(eh.col_to_id(col, ncells_tall - 1)) # -- loop over the columns, adding the vertical faces added = dict() vertical_side_cells = [] vertical_side_indices = [] for col in range(mesh2D.num_cells): nodes_2 = mesh2D.conn[col] for i in range(len(nodes_2)): edge = mesh2D.edge_hash(nodes_2[i], nodes_2[(i+1) % len(nodes_2)]) try: i_e = added[edge] except KeyError: # faces not yet added to facelist i_e = len(added.keys()) added[edge] = i_e for z_face in range(ncells_tall): i_f = len(faces) assert i_f == eh.edge_to_id(i_e, z_face) f = [ eh.node_to_id(edge[0], z_face), eh.node_to_id(edge[1], z_face), eh.node_to_id(edge[1], z_face + 1), eh.node_to_id(edge[0], z_face + 1) ] faces.append(f) face_cell = eh.col_to_id(col, z_face) cells[face_cell].append(i_f) # check if this is an external if mesh2D.edge_counts[edge] == 1: vertical_side_cells.append(face_cell) vertical_side_indices.append(len(cells[face_cell]) - 1) else: # faces already added from previous column for z_face in range(ncells_tall): i_f = eh.edge_to_id(i_e, z_face) cells[eh.col_to_id(col, z_face)].append(i_f) # Do some idiot checking # -- check we got the expected number of faces assert len(faces) == nfaces_total # -- check every cell is at least a tet for c in cells: assert len(c) > 4 # -- check surface sideset has the right number of entries assert len(surface) == mesh2D.num_cells # -- check bottom sideset has the right number of entries assert len(bottom) == mesh2D.num_cells # -- len of vertical sides sideset is number of external edges * number of cells, no pinchouts here num_sides = ncells_tall * sum(1 for e, c in mesh2D.edge_counts.items() if c == 1) assert num_sides == len(vertical_side_cells) assert num_sides == len(vertical_side_indices) # make the material ids material_ids = np.zeros((len(cells), ), 'i') for col in range(mesh2D.num_cells): z_cell = 0 for ilay in range(len(ncells_per_layer)): ncells = ncells_per_layer[ilay] for i in range(z_cell, z_cell + ncells): material_ids[eh.col_to_id(col, i)] = np_mat_ids[ilay, col] z_cell = z_cell + ncells # make the side sets side_sets = [] side_sets.append(SideSet("bottom", 1, bottom, [1, ] * len(bottom))) side_sets.append(SideSet("surface", 2, surface, [0, ] * len(surface))) side_sets.append(SideSet("external_sides", 3, vertical_side_cells, vertical_side_indices)) labeled_sets = [] for ls in mesh2D.labeled_sets: if ls.entity == 'CELL': if hasattr(ls, 'to_extrude') and ls.to_extrude: # top surface cells, to become columns of cells elem_list = [eh.col_to_id(c, j) for j in range(ncells_tall) for c in ls.ent_ids] labeled_sets.append(LabeledSet(ls.name, ls.setid, 'CELL', elem_list)) else: # top surface cells, to become side sets elem_list = [eh.col_to_id(c, 0) for c in ls.ent_ids] side_list = [0 for c in ls.ent_ids] side_sets.append(SideSet(ls.name, ls.setid, elem_list, side_list)) elif ls.entity == 'FACE': # top surface faces become faces in the subsurface # mesh, as they will extract correctly for surface BCs/observations outlet_ids_names = [] # given a 2D edge, find the 2D cell it touches col_ids = [ mesh2D.edges_to_cells[mesh2D.edge_hash(e[0], e[1])][0] for e in ls.ent_ids ] if hasattr(ls, 'to_extrude') and ls.to_extrude: elem_list = [eh.col_to_id(c, i) for c in col_ids for i in range(ncells_tall)] face_list = [ eh.edge_to_id(added[mesh2D.edge_hash(e[0], e[1])], i) for e in ls.ent_ids for i in range(ncells_tall) ] side_list = [cells[c].index(f) for (f, c) in zip(face_list, elem_list)] else: elem_list = [eh.col_to_id(c, 0) for c in col_ids] face_list = [ eh.edge_to_id(added[mesh2D.edge_hash(e[0], e[1])], 0) for e in ls.ent_ids ] side_list = [cells[c].index(f) for (f, c) in zip(face_list, elem_list)] side_sets.append(SideSet(ls.name, ls.setid, elem_list, side_list)) # reshape coords coords = coords.reshape(nnodes_total, 3) # instantiate the mesh m3 = cls(coords, faces, cells, material_ids=material_ids, side_sets=side_sets, labeled_sets=labeled_sets, crs=mesh2D.crs) return m3
[docs] def telescope_factor(ncells, dz, layer_dz): """Calculates a telescoping factor to fill a given layer. Calculates a constant geometric factor, such that a layer of thickness layer_dz is perfectly filled by ncells in the vertical, where the top cell is dz in thickness and each successive cell grows by a factor of that factor. Parameters ---------- ncells : int Number of cells (in the vertical) needed. dz : float Top cell's thickness in the vertical. layer_dz : float Thickness of the total layer. Returns ------- float The telescoping factor. """ if ncells * dz > layer_dz: raise ValueError(("Cannot telescope {} cells of thickness at least {} " + "and reach a layer of thickness {}").format(ncells, dz, layer_dz)) def seq(r): calc_layer_dz = 0 dz_new = dz for i in range(ncells): calc_layer_dz += dz_new dz_new *= r #print('tried: {} got: {}'.format(r, calc_layer_dz)) return layer_dz - calc_layer_dz res = scipy.optimize.root_scalar(seq, method='bisect', bracket=[1.0001, 2], maxiter=1000) logging.info("Converged?: ratio = {}, layer z (target = {}) = {}".format( res.root, layer_dz, seq(res.root))) return res.root
[docs] def optimize_dzs(dz_begin, dz_end, thickness, num_cells, p_thickness=1000, p_dz=10000, p_increasing=1000, p_smooth=10, tol=1): """Tries to optimize dzs""" pad_thickness = thickness + dz_begin + dz_end def penalty_thickness(dzs): return abs(pad_thickness - dzs.sum()) def penalty_dz(dzs): return dz_end / dz_begin * abs(dzs[0] - dz_begin) + abs(dzs[-1] - dz_end) def penalty_increasing(dzs): return np.maximum(0., dzs[0:-1] - dzs[1:]).sum() def penalty_smooth(dzs): growth_factor = dzs[1:] / dzs[0:-1] return np.abs(2 * growth_factor[1:-1] - growth_factor[0:-2] - growth_factor[2:]).sum() def penalty(dzs): return p_thickness * penalty_thickness(dzs) \ + p_dz * penalty_dz(dzs) \ + p_increasing * penalty_increasing(dzs) \ + p_smooth * penalty_smooth(dzs) x0 = (dz_begin+dz_end) / 2. * np.ones((num_cells, ), 'd') bounds = [(dz_begin, dz_end), ] * num_cells res = scipy.optimize.minimize(penalty, x0, bounds=bounds) dzs = res.x.copy()[1:-1] # MUST have increasing dzs.sort() # MUST have sum dzs = dzs / dzs.sum() * thickness return dzs, res
def transform_rotation(radians): return np.array([[np.cos(radians), np.sin(radians), 0], [-np.sin(radians), np.cos(radians), 0], [0, 0, 1]])
[docs] def create_submesh(m2, shp): """Given a shape that contains some cells of m2, create the submesh.""" # create the new coordinates and a map new_coords_i = [] new_coords_map = dict() for i, c in enumerate(m2.coords): if shp.intersects(shapely.geometry.Point(c[0], c[1])): new_coords_map[i] = len(new_coords_i) new_coords_i.append(i) new_coords = np.array([m2.coords[i] for i in new_coords_i]) # create the new conn and map new_conns = [] new_conn_map = dict() for j, conn in enumerate(m2.conn): if all(i in new_coords_i for i in conn): new_conn = [new_coords_map[i] for i in conn] new_conn_map[j] = len(new_conns) new_conns.append(new_conn) # new labeled sets new_labeled_sets = [] for ls in m2.labeled_sets: if (ls.entity == 'CELL'): new_ent_ids = [new_conn_map[e] for e in ls.ent_ids if e in new_conn_map.keys()] if len(new_ent_ids) > 0: new_ls = LabeledSet(ls.name, ls.setid, ls.entity, new_ent_ids, ls.to_extrude) new_labeled_sets.append(new_ls) elif (ls.entity == 'FACE'): new_edges = [(new_coords_map[e[0]], new_coords_map[e[1]]) for e in ls.ent_ids if (e[0] in new_coords_map and e[1] in new_coords_map)] if len(new_edges) > 0: new_ls = LabeledSet(ls.name, ls.setid, ls.entity, new_edges, ls.to_extrude) new_labeled_sets.append(new_ls) # create the new mesh new_mesh = Mesh2D(new_coords, new_conns, new_labeled_sets, m2.crs, m2.eps, False, True) return new_coords_map, new_conn_map, new_mesh
[docs] def merge_meshes(meshes): """ Combines multiple 2D meshes into a single mesh. It is assumed that the meshes to be combined have common vertices on the shared edge (no steiner points). labeledsets should be added after merging the mesh. Option of merging pre-existing labeledsets in the to-be-merged meshes will be added soon Parameters ---------- meshes : list(mesh.Mesh2D) The list of meshes to be merged Returns ------- mesh.Mesh2D combined mesh """ # ## identify base mesh # meshes = list(sorted(meshes, key=lambda m : -m.num_cells)) ##FIX ME## m2_combined = meshes[0] for mesh in meshes[1:]: m2_combined = merge_mesh(m2_combined, mesh) return m2_combined
[docs] def merge_mesh(mesh1, mesh2): """merge two meshes (mesh.Mesh2D objects)""" # --THIS OPTION TO BE ADDED LATER-- #transfer_labeled_sets=True assert len(mesh1.labeled_sets) + len(mesh2.labeled_sets) == 0, \ 'to-be-merged meshes should not have labeled sets, they must be added after merging' combined_coords = mesh1.coords.tolist() # mapping for adjusting coord indices mapping = { i: i for i in range(mesh2.num_nodes) } # adjust connection indices for the second mesh using the mapping def map_conn(conn, mapping): return [mapping[idx] for idx in conn] for idx, coord in enumerate(mesh2.coords): # check if the point is close enough to any point in combined_coords found = False for i, existing_coord in enumerate(combined_coords): if close_enough(coord[:2], existing_coord[:2], tolerance=1e-3): mapping[idx] = i found = True break # if the point is not close enough to any existing point, add to combined_coords if not found: combined_coords.append(coord.tolist()) mapping[idx] = len(combined_coords) - 1 adjusted_conn2 = [map_conn(conn, mapping) for conn in mesh2.conn] # merge conn lists combined_conn = mesh1.conn + adjusted_conn2 # merged mesh creation m2_combined = watershed_workflow.mesh.Mesh2D(coords=np.array(combined_coords), conn=combined_conn) # # trasferring labeled sets ##--FIX ME--## # if transfer_labeled_sets: # m2_combined.labeled_sets = m2_combined.labeled_sets + mesh1.labeled_sets # for ls in mesh2.labeled_sets: # if ls.entity == 'CELL': # new_ent_ids = [mesh1.num_cells + c for c in ls.ent_ids] # ls.ent_ids = new_ent_ids # elif ls.entity == 'FACE': # new_ent_ids = [(mapping(e[0]), mapping(e[1])) for e in ls.ent_ids] # ls.ent_ids = new_ent_ids # m2_combined.labeled_sets = m2_combined.labeled_sets + mesh2.labeled_sets return m2_combined
def close_enough(p1, p2, tolerance): return np.linalg.norm(np.array(p1) - np.array(p2)) < tolerance