Source code for watershed_workflow.sources.manager_nrcs

"""National Resources Conservation Service Soil Survey database.

Author: Ethan Coon (coonet@ornl.gov)
Author: Pin Shuai (pin.shuai@pnnl.gov)

"""
from typing import Optional, List, Tuple

import os
import logging
import requests
import numpy as np
import pandas as pd
import geopandas as gpd
import collections
import shapely.wkt
import shapely.geometry

import watershed_workflow.crs
from watershed_workflow.crs import CRS
import watershed_workflow.warp
import watershed_workflow.utils
import watershed_workflow.soil_properties

from . import filenames
from . import utils as source_utils
from . import standard_names as names
from . import manager_shapes

_query_template_props = """
SELECT
 saversion, saverest, l.areasymbol, l.areaname, l.lkey, musym, muname, museq, mu.mukey, comppct_r, compname, localphase, slope_r, c.cokey, hzdept_r, hzdepb_r, ksat_r, sandtotal_r, claytotal_r, silttotal_r, dbthirdbar_r , partdensity, wsatiated_r, ch.chkey

FROM sacatalog sac
  INNER JOIN legend l ON l.areasymbol = sac.areasymbol
    INNER JOIN mapunit mu ON mu.lkey = l.lkey
    AND mu.mukey IN
({})
  LEFT OUTER JOIN component c ON c.mukey = mu.mukey
LEFT OUTER JOIN chorizon ch ON ch.cokey = c.cokey
"""

_query_template_shapes = """
~DeclareGeometry(@aoiGeom)~
~DeclareGeometry(@aoiGeomFixed)~
CREATE TABLE #AoiTable
    ( aoiid INT IDENTITY (1,1),
    landunit VARCHAR(20),
    aoigeom GEOMETRY )
;

SELECT @aoiGeom = GEOMETRY::STGeomFromText('{}', 4326);
SELECT @aoiGeomFixed = @aoiGeom.MakeValid().STUnion(@aoiGeom.STStartPoint());
INSERT INTO #AoiTable ( landunit, aoigeom )
VALUES ('My Geom', @aoiGeomFixed);
-- End of AOI geometry section
-- #AoiSoils table contains intersected soil polygon table with geometry
CREATE TABLE #AoiSoils
    ( polyid INT IDENTITY (1,1),
    aoiid INT,
    landunit VARCHAR(20),
    mukey INT,
    soilgeom GEOMETRY )
;
-- End of CREATE TABLE section
-- Populate #AoiSoils table with intersected soil polygon geometry
INSERT INTO #AoiSoils (aoiid, landunit, mukey, soilgeom)
    SELECT A.aoiid, A.landunit, M.mukey, M.mupolygongeo.STIntersection(A.aoigeom ) AS soilgeom
    FROM mupolygon M, #AoiTable A
    WHERE mupolygongeo.STIntersects(A.aoigeom) = 1
;
-- #AoiSoils2 is single part polygon
SELECT landunit, I.mukey, musym, soilgeom.STGeometryN(Numbers.n).MakeValid() AS wktgeom
FROM #AoiSoils AS I
JOIN Numbers ON Numbers.n <= I.soilgeom.STNumGeometries()
INNER JOIN mapunit M ON I.mukey = M.mukey
;
-- END OF AOI GEOMETRY QUERY
"""


def synthesizeData(df):
    """Renames and calculates derived properties in a MUKEY-based data frame, in place"""
    # rename columns to improve readability
    rename_list = {
        'hzdept_r': 'top depth [cm]',
        'hzdepb_r': 'bot depth [cm]',
        'ksat_r': 'log Ksat [um s^-1]',
        'sandtotal_r': 'total sand pct [%]',
        'silttotal_r': 'total silt pct [%]',
        'claytotal_r': 'total clay pct [%]',
        'dbthirdbar_r': 'bulk density [g/cm^3]',
        'partdensity': 'particle density [g/cm^3]',
        'wsatiated_r': 'saturated water content [%]',
        'comppct_r': 'component pct [%]'
    }

    df.rename(columns=rename_list, inplace=True)

    # preprocess data
    df['thickness [cm]'] = df['bot depth [cm]'] - df['top depth [cm]']
    df['porosity [-]'] = 1 - df['bulk density [g/cm^3]'] / df['particle density [g/cm^3]']

    # log Ksat to average in log space, setting a min perm of 1e-16 m^2 (1.e-3 um/s)
    df['log Ksat [um s^-1]'] = np.log10(np.maximum(df['log Ksat [um s^-1]'].values, 1.e-3))

    # assume null porosity = saturated water content
    df.loc[pd.isnull(df['porosity [-]']), 'porosity [-]'] = \
        df.loc[pd.isnull(df['porosity [-]']), 'saturated water content [%]']/100
    logging.info(f'found {len(df["mukey"].unique())} unique MUKEYs.')


def aggregateComponentValues(df : gpd.GeoDataFrame,
                             agg_var : Optional[List[str]] = None):
    """
    Aggregate horizon value by layer thickness to get component property.

    Parameters
    ----------
    df_chorizon : pandas dataframe
      horizon table
    imukey_df : pandas dataframe
      individual mukey df contains component keys
    agg_var : list
      variables to average

    Returns
    -------
    df_comp : pandas dataframe
      aggregated component properties
    """
    if agg_var is None:
        agg_var = [
            'log Ksat [um s^-1]', 'porosity [-]', 'bulk density [g/cm^3]', 'total sand pct [%]',
            'total silt pct [%]', 'total clay pct [%]',
        ]

    comp_list = ['mukey', 'cokey', 'component pct [%]', 'thickness [cm]'] + agg_var
    horizon_selected_cols = [
        'mukey', 'cokey', 'chkey', 'component pct [%]', 'thickness [cm]', 'top depth [cm]',
        'bot depth [cm]', 'particle density [g/cm^3]',
    ] + agg_var

    df_comp = pd.DataFrame(columns=comp_list)
    cokeys = df['cokey'].unique()

    rows = []
    for icokey in cokeys:
        idf_horizon = df.loc[df['cokey'] == icokey, horizon_selected_cols]

        imukey = idf_horizon['mukey'].values[0]
        assert (np.all(idf_horizon['mukey'].values == imukey))
        icomp_pct = idf_horizon['component pct [%]'].values[0]
        assert (np.all(idf_horizon['component pct [%]'].values == icomp_pct))

        depth_agg_value = []
        # horizon-uniform quantities
        depth_agg_value.append(imukey)
        depth_agg_value.append(icokey)
        depth_agg_value.append(icomp_pct)
        depth_agg_value.append(None)  # thickness placeholder

        # depth-average quantities
        for ivar in agg_var:
            idf = idf_horizon[['thickness [cm]', ivar]].dropna()
            if idf.empty:
                ivalue = np.nan
            else:
                ivalue = sum(idf['thickness [cm]'] / idf['thickness [cm]'].sum() * idf[ivar])
            depth_agg_value.append(ivalue)

        idepth = idf_horizon['bot depth [cm]'].dropna().max()
        depth_agg_value[3] = idepth

        # create the local df
        assert (len(depth_agg_value) == len(comp_list))
        idf_comp = pd.DataFrame(np.array(depth_agg_value).reshape(1, len(depth_agg_value)),
                                    columns=comp_list)

        # normalize sand/silt/clay pct to make the sum(%sand, %silt, %clay)=1
        sum_soil = idf_comp.loc[:, ['total sand pct [%]', 'total silt pct [%]', 'total clay pct [%]']].sum().sum()
        if sum_soil != 100:
            for isoil in ['total sand pct [%]', 'total silt pct [%]', 'total clay pct [%]']:
                idf_comp[isoil] = idf_comp[isoil] / sum_soil * 100

        # append to df
        rows.append(idf_comp)

    return pd.concat(rows)


def aggregateMukeyValues(df : gpd.GeoDataFrame,
                         agg_var : Optional[List[str]] =None):
    """Aggregate component values by component percentage to get MUKEY property.

    Parameters
    ----------
    df : pandas dataframe
      The list of horizons in all components in all mukeys.
    agg_var : list(str), optional
      List of keys to aggregate.  Defaults to normal things from
      SSURGO.

    Returns
    -------
    df_mukey : pandas dataframe
      aggregated mukey properties

    """
    # variables to average across component and horizon. These are hard coded for now.
    if agg_var is None:
        agg_var = [
            'log Ksat [um s^-1]', 'porosity [-]', 'bulk density [g/cm^3]', 'total sand pct [%]',
            'total silt pct [%]', 'total clay pct [%]',
        ]

    # aggregate all horizons to components
    df_comp = aggregateComponentValues(df, agg_var)

    # area-average to mukey
    area_agg_var = ['thickness [cm]', ] + agg_var
    mukey_agg_var = ['mukey', ] + area_agg_var

    df_mukey_rows = []
    for imukey in df_comp['mukey'].unique()[:]:
        idf_comp = df_comp.loc[df_comp['mukey'] == imukey]
        area_agg_value = []

        # component-uniform quantities
        area_agg_value.append(imukey)

        # area-average quantities
        for ivar in area_agg_var:
            idf = idf_comp[['component pct [%]', ivar]].dropna()
            if idf.empty:
                ivalue = np.nan
            else:
                ivalue = sum(idf['component pct [%]'] / idf['component pct [%]'].sum() * idf[ivar])
            area_agg_value.append(ivalue)

        # create the local data frame and append
        assert (len(area_agg_value) == len(mukey_agg_var))
        idf_mukey = pd.DataFrame(np.array(area_agg_value).reshape(1, len(area_agg_value)),
                                     columns=mukey_agg_var)
        df_mukey_rows.append(idf_mukey)

    df_mukey = pd.concat(df_mukey_rows)
    df_mukey['mukey'] = np.array(df_mukey['mukey'], dtype=int)
    return df_mukey


[docs] class ManagerNRCS(manager_shapes.ManagerShapes): """The National Resources Conservation Service's SSURGO Database [NRCS]_ contains a huge amount of information about soil texture, parameters, and structure, and are provided as shape files containing soil type delineations with map-unit-keys (MUKEYs). These are re-broadcast onto a raster (much like gSSURGO, which is unfortunately not readable by open tools) and used to index soil parameterizations for simulation. Data is accessed via two web APIs -- the first for spatial (shapefiles) survey information, the second for properties. * https://sdmdataaccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs * https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest TODO: Functionality for mapping from MUKEY to soil parameters. .. [NRCS] Soil Survey Staff, Natural Resources Conservation Service, United States Department of Agriculture. Web Soil Survey. Available online at https://websoilsurvey.nrcs.usda.gov/. Accessed [month/day/year]. """ def __init__(self, force_download : bool=False): self.force_download = force_download # NRCS data is in lat/lon coordinates native_crs_in = watershed_workflow.crs.from_epsg('4326') # Resolution estimate for degree-based data native_resolution = 0.001 # ~100m at mid-latitudes # Native ID field is MUKEY native_id_field = 'mukey' # Initialize base class super().__init__( name='National Resources Conservation Service Soil Survey (NRCS Soils)', source='USDA NRCS SSURGO Database', native_crs_in=native_crs_in, native_resolution=native_resolution, native_id_field=native_id_field ) # NRCS-specific setup self.fstring = '{:.4f}_{:.4f}_{:.4f}_{:.4f}' self.qstring = self.fstring.replace('_', ',') self.name_manager = filenames.Names( self.name, os.path.join('soil_structure', 'SSURGO'), '', 'SSURGO_%s.shp' % self.fstring) #self.url_spatial = 'https://SDMDataAccess.sc.egov.usda.gov/Spatial/SDMWGS84Geographic.wfs' #self.url_spatial = 'https://SDMDataAccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs' self.url_spatial = 'https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest' self.url_data = 'https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest' def _getShapes(self): """Fetch all shapes in a dataset. Returns ------- gpd.GeoDataFrame Raw GeoDataFrame with native column names and CRS properly set. """ raise NotImplementedError(f'ManagerNRCS does not support getting all shapes.') def _getShapesByGeometry(self, geometry_gdf: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Fetch NRCS soil shapes and properties for the given geometry. Parameters ---------- geometry_gdf : gpd.GeoDataFrame GeoDataFrame with geometries in native_crs_in to search for shapes. Returns ------- gpd.GeoDataFrame GeoDataFrame with native column names including soil properties. """ # Try with union first, fall back to individual geometries if that fails union_geometry = geometry_gdf.union_all() try: # Try with the union geometry first shapes = self._download(union_geometry) except requests.HTTPError as e: if '500 Server Error' in str(e): # If union fails, try with each geometry individually shape_dfs = [] for _, row in geometry_gdf.iterrows(): individual_shapes = self._download(row.geometry) shape_dfs.append(individual_shapes[['mukey', 'geometry']]) shapes = pd.concat(shape_dfs) else: raise e # Group by ID and union geometries shapes_no_crs = shapes.groupby('mukey')['geometry'].apply( lambda x: shapely.ops.unary_union(x.tolist()) if len(x) > 1 else x.iloc[0] ).reset_index() shapes = shapes_no_crs.set_crs(shapes.crs) assert shapes.crs is not None # Get properties - use bounds for filename generation bounds = self._cleanBounds(union_geometry) filename = self.name_manager.file_name(*bounds) data_filename = filename[:-4] + "_properties.csv" props = self._getProperties(shapes['mukey'], data_filename) # Merge properties and shapes dfs df = shapes.merge(props, on='mukey') return df def _getShapesByID(self, ids: List[str]) -> gpd.GeoDataFrame: """Fetch NRCS shapes by MUKEY IDs. Parameters ---------- ids : List[str] List of MUKEY IDs to retrieve. Returns ------- gpd.GeoDataFrame GeoDataFrame with native column names including soil properties. """ # NRCS API is geometry-based only, not ID-based raise NotImplementedError( "ManagerNRCS doesn't support getShapesByID - NRCS API is geometry-based only. " "Use getShapesByGeometry instead." ) def _addStandardNames(self, df: gpd.GeoDataFrame) -> gpd.GeoDataFrame: """Convert native column names to standard names. Parameters ---------- df : gpd.GeoDataFrame GeoDataFrame with native column names. Returns ------- gpd.GeoDataFrame GeoDataFrame with standard column names added. """ # Add ID column from MUKEY df[names.ID] = df['mukey'].astype('string') # Add standard name df[names.NAME] = [f'NRCS-{mukey}' for mukey in df['mukey']] return df def _download(self, geometry : shapely.geometry.base.BaseGeometry ) -> gpd.GeoDataFrame: """Downloads and reads soil shapefiles. This accepts only a bounding box. Parameters ---------- geometry : shapely.geometry.base.BaseGeometry Bounding box to filter shapes. crs : CRS Coordinate system of the bounding box. Returns ------- gpd.GeoDataFrame """ bounds = self._cleanBounds(geometry) os.makedirs(self.name_manager.data_dir(), exist_ok=True) filename = self.name_manager.file_name(*bounds) logging.info("Attempting to download source for target '%s'" % filename) if not os.path.exists(filename) or self.force_download: logging.info(' Downloading spatial data via request:') logging.info(f' to file: {filename}') logging.info(f' from: {self.url_spatial}') box = shapely.geometry.box(*bounds) query = _query_template_shapes.format(box.wkt) data = { 'FORMAT': 'JSON', 'QUERY': query } r = requests.post(self.url_data, data=data, verify=source_utils.getVerifyOption()) logging.info(f' full URL: {r.url}') r.raise_for_status() logging.info(f' Converting to shapely') table = r.json()['Table'] shps = [shapely.wkt.loads(ent[3]) for ent in table] df = gpd.GeoDataFrame({'mukey':[int(t[1]) for t in table]}, geometry=shps, crs=self.native_crs_in) df.to_file(filename) shapes = gpd.read_file(filename) logging.info(' Found {} shapes.'.format(len(shapes))) return shapes def _downloadProperties(self, mukeys, filename=None): """Queries REST API for parameters by MUKEY.""" if filename is None or (not os.path.exists(filename)) or self.force_download: logging.info(f' Downloading raw properties data via request:') logging.info(f' to file: {filename}') logging.info(f' from: {self.url_data}') mukey_data_headers = [ 'saversion', 'saverest', 'areasymbol', 'areaname', 'lkey', 'musym', 'muname', 'museq', 'mukey', 'comppct_r', 'compname', 'localphase', 'slope_r', 'cokey', 'hzdept_r', 'hzdepb_r', 'ksat_r', 'sandtotal_r', 'claytotal_r', 'silttotal_r', 'dbthirdbar_r', 'partdensity', 'wsatiated_r', 'chkey' ] mukey_dtypes = [ int, str, str, str, int, str, str, int, int, int, str, str, float, int, float, float, float, float, float, float, float, float, float, int ] mukey_list_string = ','.join([f"'{k}'" for k in mukeys]) query = _query_template_props.format(mukey_list_string) data = { 'FORMAT': 'JSON', 'QUERY': query } r = requests.post(self.url_data, data=data, verify=source_utils.getVerifyOption()) logging.info(f' full URL: {r.url}') r.raise_for_status() table = np.array(r.json()['Table']) df = pd.DataFrame() def to_type(val, dtype): if val is None: if dtype == int: return -1 elif dtype == float: return np.nan elif dtype == str: return '' return dtype(val) for i, (title, dtype) in enumerate(zip(mukey_data_headers, mukey_dtypes)): df[title] = np.array([to_type(entry, dtype) for entry in table[:, i]], dtype=dtype) if filename is not None: df.to_csv(filename) else: df = pd.read_csv(filename) return df def _getProperties(self, mukeys, filename=None): """Download and aggregate properties for a given set of mukeys, storing raw data in filename.""" df = self._downloadProperties(mukeys, filename) synthesizeData(df) df_agg = aggregateMukeyValues(df) # get dataframe with van genuchten models df_vgm = watershed_workflow.soil_properties.computeVanGenuchtenModelFromSSURGO(df_agg) # fix units df_ats = watershed_workflow.soil_properties.convertRosettaToATS(df_vgm) # -- convert thickness to [m] df_ats['thickness [cm]'] = df_ats['thickness [cm]'] / 100. df_ats.rename(columns={'thickness [cm]' : 'thickness [m]'}, inplace=True) # all structure data frames are expected to have a 'source' and an 'id' in that source field df_ats['source'] = 'NRCS' # resort the dataframe to have the same order as was passed in # in the list mukeys. This alleviates potential mistakes if # they get zipped or otherwise iterated over together. df_ats.set_index('mukey', inplace=True) df_ats = df_ats.reindex(mukeys) df_ats.reset_index(inplace=True) return df_ats def _cleanBounds(self, geometry : shapely.geometry.base.BaseGeometry ) -> Tuple[float,float,float,float]: return tuple(np.round(b, 4) for b in geometry.bounds)