Source code for watershed_workflow.sources.manager_ned

"""Manager for interacting with NED datasets."""
import os, sys
import logging
import numpy as np
import shapely
import rasterio.merge
import requests
import requests.exceptions

import watershed_workflow.utils
import watershed_workflow.sources.utils as source_utils
import watershed_workflow.config
import watershed_workflow.warp
import watershed_workflow.sources.names


[docs] class FileManagerNED: """Watershed Workflow leverages the USGS's National Elevation Dataset (NED), a precursor to and currently part of the USGS's 3D Elevation Program (3DEP) [NED]_. It is available seamlessly at a variety of resolutions ranging from 2 arc-seconds to 1/3 arc-seconds (~60m and 10m, respectively) in the conterminous United States and comparable resolution through most of Alaska. Like the NHD data, these datasets are available through The National Map's [TNM]_ REST API, and are provided in 1-degree tiles. Watershed Workflow manages querying for URLs, downloading these tiles on demand, and forming the mosaic of images through underlying capability in rasterio to provide a single raster across the watershed requested. Higher resolution products, including LiDAR products across the conterminous US and IfSAR products across Alaska are coming available, but these are not currently supported by Watershed Workflow. .. [NED] Parameters ---------- resolution : str, optional Resolution of the desired product. One of: * `"1/3 arc-second`" (default) * `"1 arc-second`" file_format : str, optional Desired output format. Default and universally available is `"IMG`". """ def __init__(self, resolution='1/3 arc-second', file_format='GeoTIFF'): """Create the manager.""" self.name = 'National Elevation Dataset (NED)' self.file_format = file_format if resolution == '1/3 arc-second': self.short_res = '13as' elif resolution == '1 arc-second': self.short_res = '1as' else: raise ValueError( "{}: invalid resolution '{}', must be one of '1/9 arc-second', '1/3 arc-second', or '1 arc-second'" .format(self.name)) self.resolution = resolution self.file_format = file_format if self.file_format == 'GeoTIFF': file_extension = 'tif' else: file_extension = self.file_format.lower() self.names = watershed_workflow.sources.names.Names( self.name, 'dem', None, 'USGS_NED_%s_n{:02}_w{:03}.%s' % (self.short_res, file_extension), self.short_res + "_raw") self.crs = watershed_workflow.crs.from_epsg(4269)
[docs] def get_raster(self, shape, crs, force_download=False): """Download and read a DEM for this shape, clipping to the shape. Parameters ---------- shape : fiona or shapely shape Shape to provide bounds of the raster. crs : CRS CRS of the shape. force_download : bool Download or re-download the file if true. Returns ------- profile : rasterio profile Profile of the raster. raster : np.ndarray Array containing the elevation data. Note that the raster provided is in its native CRS (which is in the rasterio profile), not the shape's CRS. """ if type(shape) is dict: shape = watershed_workflow.utils.create_shply(shape) # warp to my crs shply = watershed_workflow.warp.shply(shape, crs, self.crs) # get the bounds and download bounds = shply.bounds feather_bounds = list(bounds[:]) feather_bounds[0] = feather_bounds[0] - .01 feather_bounds[1] = feather_bounds[1] - .01 feather_bounds[2] = feather_bounds[2] + .01 feather_bounds[3] = feather_bounds[3] + .01 files = self.download(feather_bounds, force=force_download) # merge into a single raster datasets = [rasterio.open(f) for f in files] profile = datasets[0].profile dest, output_transform = rasterio.merge.merge(datasets, bounds=feather_bounds, nodata=np.nan) dest = np.where(dest < -1.e-10, np.nan, dest) # set the profile profile['transform'] = output_transform profile['height'] = dest.shape[1] profile['width'] = dest.shape[2] profile['count'] = dest.shape[0] profile['nodata'] = np.nan return profile, dest[0]
def request(self, bounds): """Forms the REST API get to find URLs. Parameters ---------- bounds : [xmin, ymin, xmax, ymax] Desired bounds, in the raster's native CRS. Returns ------- js : json dict JSON response of the formed request. """ rest_url = 'https://tnmaccess.nationalmap.gov/api/v1/products' rest_dataset = self.name + ' ' + self.resolution rest_bounds = ','.join(str(b) for b in bounds) #[bounds[1], bounds[0], bounds[3], bounds[2]]) try: r = requests.get(rest_url, params={ 'datasets': rest_dataset, 'bbox': rest_bounds, 'prodFormats': self.file_format }, verify=source_utils.get_verify_option()) logging.info(r.url) r.raise_for_status() except requests.exceptions.ConnectionError as err: logging.error('{}: Failed to access REST API for NED DEM products.'.format(self.name)) raise err return r.json() def download(self, bounds, force=False): """Download the files covering the bounds. Parameters ---------- bounds : [xmin, ymin, xmax, ymax] Desired bounds, in the raster's native CRS. force : bool, optional If true, re-download even if a file already exists. Returns ------- filenames : list(str) List of raster files tiling the bounds. """ logging.info("Collecting DEMs to tile bounds: {}".format(bounds)) # check directory structure os.makedirs(self.names.data_dir(), exist_ok=True) os.makedirs(self.names.raw_folder_name(), exist_ok=True) # NOTE: we could get these from the REST API, but I would # prefer to not REQUIRE an internet connection if the data # already exists. # tile the bounds in lat/long 1-degree increments west = int(np.floor(bounds[0])) south = int(np.floor(bounds[1])) east = int(np.ceil(bounds[2])) north = int(np.ceil(bounds[3])) # generate the list of files needed filenames = [ self.names.file_name(j + 1, -i) for j in range(south, north) for i in range(west, east) ] logging.info(' Need:') for fname in filenames: logging.info(' {}'.format(fname)) filenames_success = [] if (any(not os.path.exists(f) for f in filenames) or force): request = self.request(bounds) for r in request['items']: url = r['downloadURL'] north = int(np.round(r['boundingBox']['maxY'])) west = int(np.round(r['boundingBox']['minX'])) filename = self.names.file_name(north, -west) if filename not in filenames: # randomly some extra matches are found? continue filenames.remove(filename) if not os.path.exists(filename) or force: downloadfilename = url.split("/")[-1] downloadfile = os.path.join(self.names.raw_folder_name(north, west), downloadfilename) logging.info("Attempting to download source for target '%s'" % filename) work_dir = self.names.raw_folder_name(north, west) if not os.path.exists(downloadfile) or force: source_utils.download(url, downloadfile, force) if downloadfile.endswith('.ZIP') or downloadfile.endswith('.zip'): source_utils.unzip(downloadfile, work_dir) unzip_filename = downloadfilename[0:-4] # hope we can find it? img_files = [] if os.path.isdir(os.path.join(work_dir, unzip_filename)): img_files = [ os.path.join(unzip_filename, f) for f in os.listdir(os.path.join(work_dir, unzip_filename)) if f.endswith('.' + self.file_format.lower()) ] if len(img_files) == 0: img_files = [ os.path.join(unzip_filename, f) for f in os.listdir(os.path.join(work_dir, unzip_filename)) if f.endswith('.' + self.file_format.upper()) ] if len(img_files) == 0: img_files = [ f for f in os.listdir(work_dir) if f.endswith('.' + self.file_format.lower()) ] if len(img_files) == 0: img_files = [ f for f in os.listdir(work_dir) if f.endswith('.' + self.file_format.upper()) ] if len(img_files) == 0: raise RuntimeError( "{}: Downloaded and unzipped '{}', but cannot find the img file.". format(self.name, downloadfile)) else: logging.debug(" Found '{}'".format(os.path.join( work_dir, img_files[0]))) source_utils.move(os.path.join(work_dir, img_files[0]), filename) else: # move the file directly, no unzipping source_utils.move(downloadfile, filename) if not os.path.exists(filename): raise RuntimeError( '{}: Cannot find or download file for source target "{}"'.format( self.name, filename)) else: filenames_success.append(filename) if len(filenames) != 0: logging.warn( 'Potentially missing tiles in the DEM covering bounds: {}'.format(bounds)) logging.warn( 'This may be a REST API error, or it may be that some tiles are oceanic and not needed.' ) logging.warn( 'Continuing, but consider this issue if some elevation data is missing.') logging.warn('Missing Tiles:') for fname in filenames: logging.warn(' {}'.format(fname)) else: logging.info('source files already exist!') filenames_success = filenames return filenames_success