"""National Resources Conservation Service Soil Survey database.
Author: Ethan Coon (coonet@ornl.gov)
Author: Pin Shuai (pin.shuai@pnnl.gov)
"""
import os, sys
import logging
import fiona
import requests
import numpy as np
import pandas
import collections
import shapely.wkt
import watershed_workflow.crs
import watershed_workflow.sources.names
import watershed_workflow.warp
import watershed_workflow.utils
import watershed_workflow.soil_properties
import watershed_workflow.io
import watershed_workflow.sources.utils as source_utils
_query_template_props = """
SELECT
saversion, saverest, l.areasymbol, l.areaname, l.lkey, musym, muname, museq, mu.mukey, comppct_r, compname, localphase, slope_r, c.cokey, hzdept_r, hzdepb_r, ksat_r, sandtotal_r, claytotal_r, silttotal_r, dbthirdbar_r , partdensity, wsatiated_r, ch.chkey
FROM sacatalog sac
INNER JOIN legend l ON l.areasymbol = sac.areasymbol
INNER JOIN mapunit mu ON mu.lkey = l.lkey
AND mu.mukey IN
({})
LEFT OUTER JOIN component c ON c.mukey = mu.mukey
LEFT OUTER JOIN chorizon ch ON ch.cokey = c.cokey
"""
_query_template_shapes = """
~DeclareGeometry(@aoiGeom)~
~DeclareGeometry(@aoiGeomFixed)~
CREATE TABLE #AoiTable
( aoiid INT IDENTITY (1,1),
landunit VARCHAR(20),
aoigeom GEOMETRY )
;
SELECT @aoiGeom = GEOMETRY::STGeomFromText('{}', 4326);
SELECT @aoiGeomFixed = @aoiGeom.MakeValid().STUnion(@aoiGeom.STStartPoint());
INSERT INTO #AoiTable ( landunit, aoigeom )
VALUES ('My Geom', @aoiGeomFixed);
-- End of AOI geometry section
-- #AoiSoils table contains intersected soil polygon table with geometry
CREATE TABLE #AoiSoils
( polyid INT IDENTITY (1,1),
aoiid INT,
landunit VARCHAR(20),
mukey INT,
soilgeom GEOMETRY )
;
-- End of CREATE TABLE section
-- Populate #AoiSoils table with intersected soil polygon geometry
INSERT INTO #AoiSoils (aoiid, landunit, mukey, soilgeom)
SELECT A.aoiid, A.landunit, M.mukey, M.mupolygongeo.STIntersection(A.aoigeom ) AS soilgeom
FROM mupolygon M, #AoiTable A
WHERE mupolygongeo.STIntersects(A.aoigeom) = 1
;
-- #AoiSoils2 is single part polygon
SELECT landunit, I.mukey, musym, soilgeom.STGeometryN(Numbers.n).MakeValid() AS wktgeom
FROM #AoiSoils AS I
JOIN Numbers ON Numbers.n <= I.soilgeom.STNumGeometries()
INNER JOIN mapunit M ON I.mukey = M.mukey
;
-- END OF AOI GEOMETRY QUERY
"""
def synthesize_data(df):
"""Renames and calculates derived properties in a MUKEY-based data frame, in place"""
# rename columns to improve readability
rename_list = {
'hzdept_r': 'top depth [cm]',
'hzdepb_r': 'bot depth [cm]',
'ksat_r': 'log Ksat [um s^-1]',
'sandtotal_r': 'total sand pct [%]',
'silttotal_r': 'total silt pct [%]',
'claytotal_r': 'total clay pct [%]',
'dbthirdbar_r': 'bulk density [g/cm^3]',
'partdensity': 'particle density [g/cm^3]',
'wsatiated_r': 'saturated water content [%]',
'comppct_r': 'component pct [%]'
}
df.rename(columns=rename_list, inplace=True)
# preprocess data
df['thickness [cm]'] = df['bot depth [cm]'] - df['top depth [cm]']
df['porosity [-]'] = 1 - df['bulk density [g/cm^3]'] / df['particle density [g/cm^3]']
# log Ksat to average in log space, setting a min perm of 1e-16 m^2 (1.e-3 um/s)
df['log Ksat [um s^-1]'] = np.log10(np.maximum(df['log Ksat [um s^-1]'].values, 1.e-3))
# assume null porosity = saturated water content
df.loc[pandas.isnull(df['porosity [-]']), 'porosity [-]'] = \
df.loc[pandas.isnull(df['porosity [-]']), 'saturated water content [%]']/100
logging.info(f'found {len(df["mukey"].unique())} unique MUKEYs.')
def aggregate_component_values(df, agg_var=None):
"""
Aggregate horizon value by layer thickness to get component property.
Parameters
----------
df_chorizon : pandas dataframe
horizon table
imukey_df : pandas dataframe
individual mukey df contains component keys
agg_var : list
variables to average
Returns
-------
df_comp : pandas dataframe
aggregated component properties
"""
if agg_var is None:
agg_var = [
'log Ksat [um s^-1]', 'porosity [-]', 'bulk density [g/cm^3]', 'total sand pct [%]',
'total silt pct [%]', 'total clay pct [%]',
]
comp_list = ['mukey', 'cokey', 'component pct [%]', 'thickness [cm]'] + agg_var
horizon_selected_cols = [
'mukey', 'cokey', 'chkey', 'component pct [%]', 'thickness [cm]', 'top depth [cm]',
'bot depth [cm]', 'particle density [g/cm^3]',
] + agg_var
df_comp = pandas.DataFrame(columns=comp_list)
cokeys = df['cokey'].unique()
rows = []
for icokey in cokeys:
idf_horizon = df.loc[df['cokey'] == icokey, horizon_selected_cols]
imukey = idf_horizon['mukey'].values[0]
assert (np.all(idf_horizon['mukey'].values == imukey))
icomp_pct = idf_horizon['component pct [%]'].values[0]
assert (np.all(idf_horizon['component pct [%]'].values == icomp_pct))
depth_agg_value = []
# horizon-uniform quantities
depth_agg_value.append(imukey)
depth_agg_value.append(icokey)
depth_agg_value.append(icomp_pct)
depth_agg_value.append(None) # thickness placeholder
# depth-average quantities
for ivar in agg_var:
idf = idf_horizon[['thickness [cm]', ivar]].dropna()
if idf.empty:
ivalue = np.nan
else:
ivalue = sum(idf['thickness [cm]'] / idf['thickness [cm]'].sum() * idf[ivar])
depth_agg_value.append(ivalue)
idepth = idf_horizon['bot depth [cm]'].dropna().max()
depth_agg_value[3] = idepth
# create the local df
assert (len(depth_agg_value) == len(comp_list))
idf_comp = pandas.DataFrame(np.array(depth_agg_value).reshape(1, len(depth_agg_value)),
columns=comp_list)
# normalize sand/silt/clay pct to make the sum(%sand, %silt, %clay)=1
sum_soil = idf_comp.loc[:, 'total sand pct [%]':'total clay pct [%]'].sum().sum()
if sum_soil != 100:
for isoil in ['total sand pct [%]', 'total silt pct [%]', 'total clay pct [%]']:
idf_comp[isoil] = idf_comp[isoil] / sum_soil * 100
# append to df
rows.append(idf_comp)
return pandas.concat(rows)
def aggregate_mukey_values(df, agg_var=None):
"""Aggregate component values by component percentage to get MUKEY property.
Parameters
----------
df : pandas dataframe
The list of horizons in all components in all mukeys.
agg_var : list(str), optional
List of keys to aggregate. Defaults to normal things from
SSURGO.
Returns
-------
df_mukey : pandas dataframe
aggregated mukey properties
"""
# variables to average across component and horizon. These are hard coded for now.
if agg_var is None:
agg_var = [
'log Ksat [um s^-1]', 'porosity [-]', 'bulk density [g/cm^3]', 'total sand pct [%]',
'total silt pct [%]', 'total clay pct [%]',
]
# aggregate all horizons to components
df_comp = aggregate_component_values(df, agg_var)
# area-average to mukey
area_agg_var = ['thickness [cm]', ] + agg_var
mukey_agg_var = ['mukey', ] + area_agg_var
df_mukey_rows = []
for imukey in df_comp['mukey'].unique()[:]:
idf_comp = df_comp.loc[df_comp['mukey'] == imukey]
area_agg_value = []
# component-uniform quantities
area_agg_value.append(imukey)
# area-average quantities
for ivar in area_agg_var:
idf = idf_comp[['component pct [%]', ivar]].dropna()
if idf.empty:
ivalue = np.nan
else:
ivalue = sum(idf['component pct [%]'] / idf['component pct [%]'].sum() * idf[ivar])
area_agg_value.append(ivalue)
# create the local data frame and append
assert (len(area_agg_value) == len(mukey_agg_var))
idf_mukey = pandas.DataFrame(np.array(area_agg_value).reshape(1, len(area_agg_value)),
columns=mukey_agg_var)
df_mukey_rows.append(idf_mukey)
df_mukey = pandas.concat(df_mukey_rows)
df_mukey['mukey'] = np.array(df_mukey['mukey'], dtype=int)
return df_mukey
[docs]
class FileManagerNRCS:
"""The National Resources Conservation Service's SSURGO Database [NRCS]_
contains a huge amount of information about soil texture, parameters, and
structure, and are provided as shape files containing soil type
delineations with map-unit-keys (MUKEYs). These are re-broadcast onto a
raster (much like gSSURGO, which is unfortunately not readable by open
tools) and used to index soil parameterizations for simulation.
Data is accessed via two web APIs -- the first for spatial
(shapefiles) survey information, the second for properties.
* https://sdmdataaccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs
* https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest
TODO: Functionality for mapping from MUKEY to soil parameters.
.. [NRCS] Soil Survey Staff, Natural Resources Conservation
Service, United States Department of Agriculture. Web Soil
Survey. Available online at
https://websoilsurvey.nrcs.usda.gov/. Accessed
[month/day/year].
"""
def __init__(self):
self.name = 'National Resources Conservation Service Soil Survey (NRCS Soils)'
self.crs = watershed_workflow.crs.from_epsg('4326')
self.fstring = '{:.4f}_{:.4f}_{:.4f}_{:.4f}'
self.qstring = self.fstring.replace('_', ',')
self.name_manager = watershed_workflow.sources.names.Names(
self.name, os.path.join('soil_structure', 'SSURGO'), '', 'SSURGO_%s.shp' % self.fstring)
#self.url_spatial = 'https://SDMDataAccess.sc.egov.usda.gov/Spatial/SDMWGS84Geographic.wfs'
#self.url_spatial = 'https://SDMDataAccess.nrcs.usda.gov/Spatial/SDMWGS84Geographic.wfs'
self.url_spatial = 'https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest'
self.url_data = 'https://sdmdataaccess.nrcs.usda.gov/Tabular/post.rest'
[docs]
def get_shapes(self, bounds, bounds_crs, force_download=False):
"""Downloads and reads soil shapefiles.
This accepts only a bounding box.
Parameters
----------
bounds : [xmin, ymin, xmax, ymax]
Bounding box to filter shapes.
crs : CRS
Coordinate system of the bounding box.
force_download : bool
Download or re-download the file if true.
Returns
-------
profile : dict
Fiona profile of the shapefile.
shapes : list
List of fiona shapes that match the index or bounds.
"""
if type(bounds) is int:
raise TypeError('NRCS file manager only handles bounds, not indices.')
bounds = self.bounds(bounds, bounds_crs)
filename = self._download(bounds, force=force_download)
# def _flip(shp):
# """Generate a new fiona shape in long-lat from one in lat-long"""
# for ring in watershed_workflow.utils.generate_rings(shp):
# for i,c in enumerate(ring):
# ring[i] = c[1],c[0]
# return shp
with fiona.open(filename, 'r') as fid:
profile = fid.profile
#shapes = [_flip(s) for s in fid]
shapes = list(fid)
for s in shapes:
s['properties']['id'] = s['id']
logging.info(' Found {} shapes.'.format(len(shapes)))
logging.info(' and crs: {}'.format(watershed_workflow.crs.from_fiona(profile['crs'])))
return profile, shapes
def download_properties(self, mukeys, filename=None, force=False):
"""Queries REST API for parameters by MUKEY."""
if filename is None or (not os.path.exists(filename)) or force:
logging.info(f' Downloading raw properties data via request:')
logging.info(f' to file: {filename}')
logging.info(f' from: {self.url_data}')
mukey_data_headers = [
'saversion', 'saverest', 'areasymbol', 'areaname', 'lkey', 'musym', 'muname',
'museq', 'mukey', 'comppct_r', 'compname', 'localphase', 'slope_r', 'cokey',
'hzdept_r', 'hzdepb_r', 'ksat_r', 'sandtotal_r', 'claytotal_r', 'silttotal_r',
'dbthirdbar_r', 'partdensity', 'wsatiated_r', 'chkey'
]
mukey_dtypes = [
int, str, str, str, int, str, str, int, int, int, str, str, float, int, float,
float, float, float, float, float, float, float, float, int
]
mukey_list_string = ','.join([f"'{k}'" for k in mukeys])
query = _query_template_props.format(mukey_list_string)
data = { 'FORMAT': 'JSON', 'QUERY': query }
r = requests.post(self.url_data, data=data, verify=source_utils.get_verify_option())
logging.info(f' full URL: {r.url}')
r.raise_for_status()
table = np.array(r.json()['Table'])
df = pandas.DataFrame()
def to_type(val, dtype):
if val is None:
if dtype == int:
return -1
elif dtype == float:
return np.nan
elif dtype == str:
return ''
return dtype(val)
for i, (title, dtype) in enumerate(zip(mukey_data_headers, mukey_dtypes)):
df[title] = np.array([to_type(entry, dtype) for entry in table[:, i]], dtype=dtype)
if filename is not None:
df.to_csv(filename)
else:
df = pandas.read_csv(filename)
return df
def get_properties(self, mukeys, filename=None, force_download=False):
"""Download and aggregate properties for a given set of mukeys, storing raw data in filename."""
df = self.download_properties(mukeys, filename, force_download)
synthesize_data(df)
df_agg = aggregate_mukey_values(df)
# get dataframe with van genuchten models
df_vgm = watershed_workflow.soil_properties.vgm_from_SSURGO(df_agg)
# fix units
df_ats = watershed_workflow.soil_properties.to_ATS(df_vgm)
# all structure data frames are expected to have a 'source' and an 'id' in that source field
df_ats['source'] = 'NRCS'
# resort the dataframe to have the same order as was passed in
# in the list mukeys. This alleviates potential mistakes if
# they get zipped or otherwise iterated over together.
df_ats.set_index('mukey', inplace=True)
df_ats = df_ats.reindex(mukeys)
df_ats.reset_index(inplace=True)
return df_ats
[docs]
def get_shapes_and_properties(self, shapes, crs, force_download=False, split_download=False):
"""Downloads and reads soil shapefiles, and aggregates SSURGO data onto MUKEYS
Accepts either a bounding box, shape, or list of shapes.
Parameters
----------
shapes : shply, list(shply), or [xmin, ymin, xmax, ymax]
Shapes on which to run the query.
crs : CRS
Coordinate system of the bounding box.
force_download : bool
Download or re-download the file if true.
Returns
-------
profile : dict
Fiona profile of the shapefile.
shapes : list
List of fiona shapes that match the index or bounds.
properties : pandas dataframe
Dataframe of data by mukey = shape['id']
"""
if type(shapes) is list:
if len(shapes) == 0:
return None, list(), None
elif len(shapes) == 4 and type(shapes[0]) is float:
list_of_bounds = [shapes, ]
else:
list_of_bounds = [shapely.ops.unary_union(shapes).bounds, ]
else:
list_of_bounds = [shapes.bounds, ]
total_bounds = list_of_bounds[0]
filename = self.name_manager.file_name(*self.bounds(total_bounds, crs))
if not split_download:
try:
profile, mukeys, shapes = self._get_shapes(list_of_bounds, crs, force_download)
except requests.HTTPError as e:
if '500 Server Error' in str(e):
# failed because too big!
if type(shapes) is list and type(shapes[0]) is not float:
# get the shapes independently
list_of_bounds = [shp.bounds for shp in shapes]
logging.info(
'Shape is too big -- attempting to download as separate shapes and merge.'
)
profile, mukeys, shapes = self._get_shapes(list_of_bounds, crs,
force_download)
else:
raise ValueError(
'NRCS get_shapes() called with too large of a shape for NRCS servers. Trying splitting it into smaller polygons and trying again.'
)
else:
raise e
else:
list_of_bounds = [shp.bounds for shp in shapes]
profile, mukeys, shapes = self._get_shapes(list_of_bounds, crs, force_download)
data_filename = filename[:-4] + "_properties.csv"
df = self.get_properties(mukeys, data_filename, force_download)
return profile, shapes, df
def _get_shapes(self, list_of_bounds, crs, force_download=False):
"""Inner function called by above, accepts list of bounds only."""
shapes = []
for bounds in list_of_bounds:
profile, shapes_l = self.get_shapes(bounds, crs, force_download)
shapes.extend(shapes_l)
logging.info(f' Downloaded {len(shapes)} total shapes')
# find unique
unique = collections.defaultdict(list)
for s in shapes:
unique[s['properties']['mukey']].append(s)
logging.info(f' Downloaded {len(unique)} unique mukeys')
# merge the shapes into multipolygons, converting to shapely in the process
for mukey in unique:
unique[mukey] = [watershed_workflow.utils.create_shply(s) for s in unique[mukey]]
# sort -- this just keeps everything cleaner
unique = sorted(unique.items())
shapes = [shapely.ops.unary_union(l[1]) for l in unique]
mukeys = [l[0] for l in unique]
# put the mukey on the MultiPolygon object
for mukey, shp in zip(mukeys, shapes):
shp.properties = { 'mukey': mukey }
return profile, mukeys, shapes
def bounds(self, b, bounds_crs):
"""Create a bounds in the NRCS coordinate system for use in downloading."""
b = watershed_workflow.warp.bounds(b, bounds_crs, self.crs)
b = [
np.round(b[0], 4) - .0001,
np.round(b[1], 4) - .0001,
np.round(b[2], 4) + .0001,
np.round(b[3], 4) + .0001
]
return b
def _download(self, bounds, force=False):
"""Downloads the data and writes it to disk."""
os.makedirs(self.name_manager.data_dir(), exist_ok=True)
filename = self.name_manager.file_name(*bounds)
logging.info("Attempting to download source for target '%s'" % filename)
if not os.path.exists(filename) or force:
logging.info(' Downloading spatial data via request:')
logging.info(f' to file: {filename}')
logging.info(f' from: {self.url_spatial}')
box = shapely.geometry.box(*bounds)
query = _query_template_shapes.format(box.wkt)
data = { 'FORMAT': 'JSON', 'QUERY': query }
r = requests.post(self.url_data, data=data, verify=source_utils.get_verify_option())
logging.info(f' full URL: {r.url}')
r.raise_for_status()
logging.info(f' Converting to shapely')
table = r.json()['Table']
shps = [shapely.wkt.loads(ent[3]) for ent in table]
for shp, t in zip(shps, table):
shp.properties = dict(mukey=int(t[1]))
logging.info(f' Writing to shapefile')
watershed_workflow.io.write_to_shapefile(filename, shps, self.crs)
return filename