Source code for watershed_workflow.sources.manager_nhd

import os, sys
import logging
import fiona
import shapely
import attr
import requests
import shutil

import watershed_workflow.sources.utils as source_utils
import watershed_workflow.config
import watershed_workflow.sources.names
import watershed_workflow.utils
import watershed_workflow.warp


[docs] @attr.s class _FileManagerNHD: """Manager for interacting with USGS National Hydrography Datasets. Note that this includes NHD, NHDPlus, and WBD -- this class should not be used directly but instead use one of the derived classes: * `manager_nhd.FileManagerNHD` (Hi Res) * `manager_nhd.FileManagerNHDPlus` (Hi Res) * `manager_nhd.FileManagerWBD` Watershed Workflow leverages the Watershed Boundary Dataset (WBD) and the National Hydrography Dataset (NHD), USGS and EPA datasets available at multiple resolutions to represent United States watersheds, including Alaska [NHD]_. Also used is the NHD Plus dataset, an augmented dataset built on watershed boundaries and elevation products. By default, the 1:100,000 High Resolution datasets are used. Data is discovered through The National Map's [TNM]_ REST API, which allows querying for data files organized by HUC and resolution via HTTP POST requests, providing direct-download URLs. Files are downloaded on first request, unzipped, and stored in the data library for future use. Currently, files are indexed by 2-digit (WBD), 4-digit (NHD Plus HR) and 8-digit (NHD) HUCs. .. [NHD] https://www.usgs.gov/core-science-systems/ngp/national-hydrography .. [TNM] https://viewer.nationalmap.gov/help/documents/TNMAccessAPIDocumentation/TNMAccessAPIDocumentation.pdf """ name = attr.ib(type=str) file_level = attr.ib(type=int) lowest_level = attr.ib(type=int) name_manager = attr.ib() _nhdplus_vaa = dict({ 'StreamOrder': 'StreamOrde', 'StreamLevel': 'StreamLeve', 'HydrologicSequence': 'HydroSeq', 'DownstreamMainPathHydroSeq': 'DnHydroSeq', 'UpstreamMainPathHydroSeq': 'UpHydroSeq', 'DivergenceCode': 'Divergence', 'MinimumElevationSmoothed': 'MinElevSmo', 'MaximumElevationSmoothed': 'MaxElevSmo', 'MinimumElevationRaw': 'MinElevRaw', 'MaximumElevationRaw': 'MaxElevRaw', 'CatchmentAreaSqKm': 'AreaSqKm', 'TotalDrainageAreaSqKm': 'TotDASqKm', 'DivergenceRoutedDrainAreaSqKm': 'DivDASqKm', }) _nhdplus_eromma = dict({ 'MeanAnnualFlow': 'QAMA', 'MeanAnnualVelocity': 'VAMA', 'MeanAnnualFlowGaugeAdj': 'QEMA' })
[docs] def get_huc(self, huc, force_download=False, exclude_hu_types=None): """Get the specified HUC in its native CRS. Parameters ---------- huc : int or str The USGS Hydrologic Unit Code force_download : bool, optional If true, delete any file and redownload. exclude_hu_types : list[str], optional List of HUtypes to exclude. Likely this is None or ['W',] to exclude water HUCs for e.g. a bay, great lake, or ocean. Returns ------- profile : dict The fiona shapefile profile (see Fiona documentation). hu : dict Fiona shape object representing the hydrologic unit. Note this finds and downloads files as needed. """ huc = source_utils.huc_str(huc) profile, hus = self.get_hucs(huc, len(huc), force_download) assert (len(hus) == 1) return profile, hus[0]
[docs] def get_hucs(self, huc, level, force_download=False, exclude_hu_types=None): """Get all sub-catchments of a given HUC level within a given HUC. Parameters ---------- huc : int or str The USGS Hydrologic Unit Code level : int Level of requested sub-catchments. Must be larger or equal to the level of the input huc. force_download : bool Download or re-download the file if true. exclude_hu_types : list[str] List of HUtypes to exclude. Likely this is None or ['W',] to exclude water HUCs for e.g. a bay, great lake, or ocean. Returns ------- profile : dict The fiona shapefile profile (see Fiona documentation). hus : list(dict) List of fiona shape objects representing the hydrologic units. Note this finds and downloads files as needed. """ huc = source_utils.huc_str(huc) huc_level = len(huc) # error checking on the levels, require file_level <= huc_level <= level <= lowest_level if self.lowest_level < level: raise ValueError("{}: files include HUs at max level {}.".format( self.name, self.lowest_level)) if level < huc_level: raise ValueError("{}: cannot ask for HUs at level {} contained in {}.".format( self.name, level, huc_level)) if huc_level < self.file_level: raise ValueError( "{}: files are organized at HUC level {}, so cannot ask for a larger HUC than that level." .format(self.name, self.file_level)) # download the file filename = self._download(huc[0:self.file_level], force=force_download) logging.info('Using HUC file "{}"'.format(filename)) # read the file layer = 'WBDHU{}'.format(level) logging.debug("{}: opening '{}' layer '{}' for HUCs in '{}'".format( self.name, filename, layer, huc)) with fiona.open(filename, mode='r', layer=layer) as fid: hus = [hu for hu in fid if source_utils.get_code(hu, level).startswith(huc)] profile = fid.profile profile['always_xy'] = True for hu in hus: if 'huc' + str(level) in hu['properties']: hu['properties']['ID'] = str(int(hu['properties']['huc' + str(level)])) if exclude_hu_types is not None: hus = [hu for hu in hus if hu['properties']['hutype'] not in exclude_hu_types] return profile, hus
[docs] def get_hydro(self, huc, bounds=None, bounds_crs=None, in_network=True, properties=None, include_catchments=False, force_download=False): """Get all reaches within a given HUC and/or coordinate bounds. Parameters ---------- huc : int or str The USGS Hydrologic Unit Code bounds : [xmin, ymin, xmax, ymax], optional Coordinate bounds to filter reaches returned. If this is provided, bounds_crs must also be provided. bounds_crs : CRS, optional CRS of the above bounds. in_network : bool, optional If True (default), remove reaches that are not "in" the NHD network properties : list(str) or bool, optional A list of property aliases to be added to reaches. See alias names in Table 16 (NHDPlusFlowlineVAA) or 17 (NHDPlusEROMMA) of NHDPlus User Guide). This is only supported for NHDPlus. Commonly used properties include: - 'TotalDrainageAreaKmSq' : total drainage area - 'CatchmentAreaKmSq' : differential catchment contributing area - 'HydrologicSequence' : VAA sequence information - 'DownstreamMainPathHydroSeq' : VAA sequence information - 'UpstreamMainPathHydroSeq' : VAA sequence information - 'catchment' : catchment polygon geometry If bool is provided and the value is True, a standard default set of VAA and EROMMA attributes are added as properties. include_catchments : bool, optional If True, adds catchment polygons for each reach in the river tree from 'NHDPlusCatchment' layer force_download : bool Download or re-download the file if true. Returns ------- profile : dict The fiona shapefile profile (see Fiona documentation). reaches : list(dict) List of fiona shape objects representing the stream reaches. Note this finds and downloads files as needed. """ if properties is True: properties = list(self._nhdplus_vaa.keys()) + list(self._nhdplus_eromma.keys()) if 'WBD' in self.name: raise RuntimeError(f'{self.name}: does not provide hydrographic data.') huc = source_utils.huc_str(huc) hint_level = len(huc) # try to get bounds if not provided if bounds is None: # can we infer a bounds by getting the HUC? profile, hu = self.get_huc(huc) bounds = watershed_workflow.utils.create_bounds(hu) bounds_crs = watershed_workflow.crs.from_fiona(profile['crs']) # error checking on the levels, require file_level <= huc_level <= lowest_level if hint_level < self.file_level: raise ValueError( f"{self.name}: files are organized at HUC level {self.file_level}, so cannot ask for a larger HUC level." ) # download the file filename = self._download(huc[0:self.file_level], force=force_download) logging.info(' Using Hydrography file "{}"'.format(filename)) # find and open the hydrography layer filename = self.name_manager.file_name(huc[0:self.file_level]) layer = 'NHDFlowline' logging.info( f" {self.name}: opening '{filename}' layer '{layer}' for streams in '{bounds}'") with fiona.open(filename, mode='r', layer=layer) as fid: profile = fid.profile bounds = watershed_workflow.warp.bounds( bounds, bounds_crs, watershed_workflow.crs.from_fiona(profile['crs'])) reaches = [r for (i, r) in fid.items(bbox=bounds)] logging.info(f" Found total of {len(reaches)} in bounds.") # check if the dataset is in old NHD Format (title case) or new format (lower case) to_lower = 'nhdplusid' in reaches[0]['properties'] # filter not in network prop_key = 'InNetwork' if not to_lower else 'innetwork' if 'NHDPlus' in self.name and in_network: logging.info("Filtering reaches not in-network") reaches = [ r for r in reaches if prop_key in r['properties'] and r['properties'][prop_key] == 1 ] # associate IDs id_key = 'NHDPlusID' if not to_lower else 'nhdplusid' if 'Plus' in self.name and properties is not None: for r in reaches: r['properties']['ID'] = str(int(r['properties'][id_key])) # associate catchment areas with the reaches if NHDPlus if 'Plus' in self.name and properties != None: reach_dict = dict((r['properties']['ID'], r) for r in reaches) # validation of properties valid_props = list(self._nhdplus_vaa.keys()) + list( self._nhdplus_eromma.keys()) + ['catchment', ] for prop in properties: if prop not in valid_props: raise ValueError( f'Unrecognized NHDPlus property {prop}. If you are sure this is valid, add the alias and variable name to the nhdplus tables in FileManagerNHDPlus.' ) # flags for which layers will be needed if include_catchments: layer = 'NHDPlusCatchment' logging.info( f" {self.name}: opening '{filename}' layer '{layer}' for catchments in '{bounds}'" ) for r in reaches: r['properties']['catchment'] = None with fiona.open(filename, mode='r', layer=layer) as fid: id_key = 'NHDPlusID' if not to_lower else 'nhdplusid' for catchment in fid.values(): reach = reach_dict.get(str(int(catchment['properties'][id_key]))) if reach is not None: reach['properties']['catchment'] = catchment # turn off logging -- fiona errors crash otherwise! logging.disable(logging.CRITICAL) if len(set(self._nhdplus_vaa.keys()).intersection(set(properties))) > 0: layer = 'NHDPlusFlowlineVAA' logging.info( f" {self.name}: opening '{filename}' layer '{layer}' for river network properties in '{bounds}'" ) with fiona.open(filename, mode='r', layer=layer) as fid: for flowline in fid.values(): reach = reach_dict.get(str(int(flowline['properties'][id_key]))) if reach is not None: for prop in properties: if prop in list(self._nhdplus_vaa.keys()): prop_code = self._nhdplus_vaa[prop] if to_lower: prop_code = prop_code.lower() reach['properties'][prop] = flowline['properties'][prop_code] if len(set(self._nhdplus_eromma.keys()).intersection(set(properties))) > 0: layer = 'NHDPlusEROMMA' logging.info( f" {self.name}: opening '{filename}' layer '{layer}' for river network properties in '{bounds}'" ) with fiona.open(filename, mode='r', layer=layer) as fid: for flowline in fid.values(): reach = reach_dict.get(str(int(flowline['properties'][id_key]))) if reach is not None: for prop in properties: if prop in list(self._nhdplus_eromma.keys()): prop_code = self._nhdplus_eromma[prop] if to_lower: prop_code = prop_code.lower() reach['properties'][prop] = flowline['properties'][prop_code] logging.disable(logging.NOTSET) return profile, reaches
def get_waterbodies(self, huc, bounds=None, bounds_crs=None, force_download=False): """Get all water bodies, e.g. lakes, reservoirs, etc, within a given HUC and/or coordinate bounds. Parameters ---------- huc : int or str The USGS Hydrologic Unit Code bounds : [xmin, ymin, xmax, ymax], optional Coordinate bounds to filter reaches returned. If this is provided, bounds_crs must also be provided. bounds_crs : CRS, optional CRS of the above bounds. force_download : bool Download or re-download the file if true. Returns ------- profile : dict The fiona shapefile profile (see Fiona documentation). reaches : list(dict) List of fiona shape objects representing the stream reaches. Note this finds and downloads files as needed. """ if 'NHDPlus' not in self.name: raise RuntimeError(f'{self.name}: does not provide water body data.') huc = source_utils.huc_str(huc) hint_level = len(huc) # try to get bounds if not provided if bounds is None: # can we infer a bounds by getting the HUC? profile, hu = self.get_huc(huc) bounds = watershed_workflow.utils.create_bounds(hu) bounds_crs = watershed_workflow.crs.from_fiona(profile['crs']) # error checking on the levels, require file_level <= huc_level <= lowest_level if hint_level < self.file_level: raise ValueError( f"{self.name}: files are organized at HUC level {self.file_level}, so cannot ask for a larger HUC level." ) # download the file filename = self._download(huc[0:self.file_level], force=force_download) logging.info(' Using Hydrography file "{}"'.format(filename)) # find and open the waterbody layer filename = self.name_manager.file_name(huc[0:self.file_level]) layer = 'NHDWaterbody' logging.info( f" {self.name}: opening '{filename}' layer '{layer}' for streams in '{bounds}'") with fiona.open(filename, mode='r', layer=layer) as fid: profile = fid.profile bounds = watershed_workflow.warp.bounds( bounds, bounds_crs, watershed_workflow.crs.from_fiona(profile['crs'])) bodies = [r for (i, r) in fid.items(bbox=bounds)] logging.info(f" Found total of {len(bodies)} in bounds.") return profile, bodies def _url(self, hucstr): """Use the USGS REST API to find the URL to download a file for a given huc. Parameters ---------- hucstr : str The USGS Hydrologic Unit Code Returns ------- url : str The URL to download a file containing shapes for the HUC. """ rest_url = 'https://tnmaccess.nationalmap.gov/api/v1/products' hucstr = hucstr[0:self.file_level] def attempt(params): r = requests.get(rest_url, params=params, verify=source_utils.get_verify_option()) logging.info(f' REST URL: {r.url}') try: r.raise_for_status() except Exception as e: logging.error(e) return 1, e try: json = r.json() except Exception as e: logging.error(e) return 1, e #logging.debug(json) matches = [(m, self._valid_url(i, m, hucstr)) for (i, m) in enumerate(json['items'])] logging.debug(f' found {len(matches)} matches') matches_f = list(filter(lambda tup: tup[1], matches)) logging.debug(f' found {len(matches_f)} valid matches') if len(matches_f) == 0: logging.error('{}: no matches for HUC {} ({})'.format(self.name, hucstr, len(matches))) return 1, '{}: not able to find HUC {}'.format(self.name, hucstr) if len(matches_f) > 1: logging.error('{}: too many matches for HUC {} ({})'.format( self.name, hucstr, len(matches))) for (m, url) in matches_f: logging.error(' {}\n {}'.format(m['title'], url)) return 1, '{}: too many matches for HUC {}'.format(self.name, hucstr) return 0, matches_f[0][1] # cheaper if it works, may not work in alaska? a1 = attempt({ 'datasets': self.name, 'polyType': 'huc{}'.format(self.file_level), 'polyCode': hucstr }) if not a1[0]: logging.debug(' REST query with polyCode... SUCCESS') logging.debug(f' REST query: {a1[1]}') return a1[1] else: logging.debug(' REST query with polyCode... FAIL') # may find via huc4? if (self.file_level >= 4): a2 = attempt({ 'datasets': self.name, 'polyType': 'huc4', 'polyCode': hucstr[0:4] }) if not a2[0]: logging.debug(' REST query with polyCode... SUCCESS') logging.debug(f' REST query: {a2[1]}') return a2[1] else: logging.debug(' REST query with polyCode... FAIL') # # works more univerasally but is a BIG lookup, then filter locally # a2 = attempt({'datasets':self.name}) # if not a2[0]: # logging.debug(' REST query without polyCODE... SUCCESS') # logging.debug(f' REST query: {a2[1]}') # return a2[1] # logging.debug(' REST query without polyCODE... FAIL') raise ValueError('{}: cannot find HUC {}'.format(self.name, hucstr)) def _valid_url(self, i, match, huc, gdb_only=True): """Helper function that returns the URL if valid, or False if not.""" ok = True logging.info(f'Checking match for {huc}? {match["downloadURL"]}') if ok: ok = "format" in match logging.info(f'format in match? {ok}') if ok: ok = "urls" in match logging.info(f'urls in match? {ok}') if ok: # search for a GDB url try: url_type = next(ut for ut in match['urls'] if 'GDB' in ut or 'GeoDatabase' in ut) except StopIteration: logging.info(f'Cannot find GDB url: {list(match["urls"].keys())}') return False else: url = match['urls'][url_type] # we have a url, is it actually this huc? url_split = url.split('/') logging.info(f'YAY: {url}') logging.info(f'Checking match {i}: {url_split[-2]}, {url_split[-1]}') # check the title contains (NHD) if NHD, or (NHDPlus HR) if NHDPlus if ok: ok = "title" in match logging.debug(f'title in match? {ok}') if ok: for abbrev in ['NHD', 'NHDPlus', 'NHDPlus HR', 'WBD']: my_abbrev = f'({abbrev})'.lower() if my_abbrev in self.name.lower(): break ok = my_abbrev in match["title"].lower() logging.debug(f'name in title? {ok}') if not ok: return False if huc not in url_split[-1]: # not the right HUC return False if gdb_only and 'GDB' != url_split[-2]: # not a GDB return False return url def _download(self, hucstr, force=False): """Find and download data from a given HUC. Parameters ---------- hucstr : str The USGS Hydrologic Unit Code force : bool, optional If true, re-download even if a file already exists. Returns ------- filename : str The path to the resulting downloaded dataset. """ # check directory structure os.makedirs(self.name_manager.data_dir(), exist_ok=True) os.makedirs(self.name_manager.folder_name(hucstr), exist_ok=True) work_folder = self.name_manager.raw_folder_name(hucstr) os.makedirs(work_folder, exist_ok=True) filename = self.name_manager.file_name(hucstr) if not os.path.exists(filename) or force: url = self._url(hucstr) downloadfile = os.path.join(work_folder, url.split("/")[-1]) if not os.path.exists(downloadfile) or force: logging.info("Attempting to download source for target '%s'" % filename) source_utils.download(url, downloadfile, force) source_utils.unzip(downloadfile, work_folder) # hope we can find it? gdb_files = [f for f in os.listdir(work_folder) if f.endswith('.gdb')] assert (len(gdb_files) == 1) if os.path.exists(filename): shutil.rmtree(filename) source_utils.move(os.path.join(work_folder, gdb_files[0]), filename) if not os.path.exists(filename): raise RuntimeError("Cannot find or download file for source target '%s'" % filename) return filename
class FileManagerNHDPlus(_FileManagerNHD): def __init__(self): name = 'National Hydrography Dataset Plus High Resolution (NHDPlus HR)' super().__init__( name, 4, 12, watershed_workflow.sources.names.Names(name, 'hydrography', 'NHDPlus_H_{}_GDB', 'NHDPlus_H_{}.gdb')) class FileManagerNHD(_FileManagerNHD): def __init__(self): name = 'National Hydrography Dataset (NHD)' super().__init__( name, 8, 12, watershed_workflow.sources.names.Names(name, 'hydrography', 'NHD_H_{}_GDB', 'NHD_H_{}.gdb')) class FileManagerWBD(_FileManagerNHD): def __init__(self): name = 'National Watershed Boundary Dataset (WBD)' super().__init__( name, 2, 12, watershed_workflow.sources.names.Names(name, 'hydrography', 'WBD_{}_GDB', 'WBD_{}.gdb'))