import os
import typing
import time
import json
import pyflwdir
import rasterio
import rasterio.features
import shapely
import geopandas
import numpy
from .core import Core
from .raster import Raster
[docs]
class Watershed:
'''
Provides functionality for watershed delineation from Digital Elevation Model (DEM).
'''
[docs]
def dem_extended_area_to_basin(
self,
input_file: str,
basin_file: str,
output_file: str
) -> geopandas.GeoDataFrame:
'''
Computes the basin from an extended DEM area by considering the highest flow accumulation point
as the main outlet. Generally, open-source DEMs have a rectangular shape with a geographic
Coordinate Reference System (CRS). The DEM must be converted to a projected CRS using the
:meth:`GeoAnalyze.Raster.crs_reprojection` method.
.. note::
Since the basin is generated without any input outlet point, the main outlet point should be
located close to the corresponding edge of the extended DEM area. The greater the distance
between the outlet point and the edge of the extended DEM area, the higher the probability
of obtaining a larger basin area than expected.
Parameters
----------
input_file : str
Path to the input DEM raster file.
basin_file : str
Path to save the output basin shapefile.
output_file : str
Path to save the output clipped DEM raster file by the basin area.
Returns
-------
GeoDataFrame
A GeoDataFrame representing the output basin polygon area.
'''
# time at starting of the program
run_time = time.time()
# check validity of output shapefile path
check_file = Core().is_valid_ogr_driver(basin_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check validity of output raster file path
check_file = Core().is_valid_raster_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# flow direction array
start_time = time.time()
with rasterio.open(input_file) as input_dem:
dem_profile = input_dem.profile
dem_array = input_dem.read(1).astype('float32')
dem_res = input_dem.res
pitfill_array, flwdir_array = pyflwdir.dem.fill_depressions(
elevtn=dem_array,
outlets='edge',
nodata=dem_profile['nodata']
)
required_time = round(time.time() - start_time, 2)
print(
f'Pit filling and flow direction calculation time (seconds): {required_time}',
flush=True
)
# flow accumulation array
start_time = time.time()
mask_array = (dem_array != dem_profile['nodata']).astype('int32')
flwdir_object = pyflwdir.from_array(
data=flwdir_array,
transform=dem_profile['transform']
)
flwacc_array = flwdir_object.accuflux(
data=mask_array
)
flwacc_array[mask_array == 0] = dem_profile['nodata']
required_time = round(time.time() - start_time, 2)
print(
f'Flow accumulation calculation time (seconds): {required_time}',
flush=True
)
# highest flow accumulation point GeoDataFrame
start_time = time.time()
point_shape = rasterio.features.shapes(
source=flwacc_array,
mask=flwacc_array == flwacc_array.max(),
transform=dem_profile['transform']
)
point_feature = [
{'geometry': geom, 'properties': {'flwacc': val}} for geom, val in point_shape
]
point_gdf = geopandas.GeoDataFrame.from_features(
features=point_feature,
crs=dem_profile['crs']
)
point_gdf['id'] = 1
point_gdf['geometry'] = point_gdf.geometry.centroid
# basin polygon GeoDataFrame
basin_array = flwdir_object.basins(
xy=(point_gdf.geometry.x, point_gdf.geometry.y),
ids=point_gdf['id'].astype('uint32')
)
basin_shape = rasterio.features.shapes(
source=basin_array.astype('int32'),
mask=basin_array != 0,
transform=dem_profile['transform'],
connectivity=8
)
basin_feature = [
{'geometry': geom, 'properties': {'id': val}} for geom, val in basin_shape
]
basin_gdf = geopandas.GeoDataFrame.from_features(
features=basin_feature,
crs=dem_profile['crs']
)
flwacc_value = point_gdf['flwacc'].iloc[0]
basin_gdf['flwacc'] = flwacc_value
basin_gdf.to_file(basin_file)
required_time = round(time.time() - start_time, 2)
print(
f'Basin calculation time (seconds): {required_time}',
flush=True
)
print(
f'Basin area: {flwacc_value * dem_res[0] * dem_res[1]}',
flush=True
)
# dem clipping
Raster().clipping_by_shapes(
input_file=input_file,
shape_file=basin_file,
output_file=output_file
)
# total time to run the program
total_time = round(time.time() - run_time, 2)
print(
f'Total time (seconds): {total_time}',
flush=True
)
return basin_gdf
[docs]
def dem_delineation(
self,
dem_file: str,
outlet_type: str,
tacc_type: str,
tacc_value: float,
folder_path: str,
flw_col: str = 'flw_id'
) -> str:
'''
Generates delineation raster outputs, including flow direction (`flwdir.tif`), slope (`slope.tif`), aspect (`aspect.tif`),
and flow accumulation (`flwacc.tif`). Using the provided flow accumulation threshold, the function also generates shapefiles
for streams (`stream_lines.shp`), subbasins (`subbasins.shp`), subbasin drainage points (`subbasin_drainage_points.shp`), and main outlets
(`outlet_points.shp`). All shapefiles include a common identifier column, `flw_col`, to facilitate cross-referencing.
The `subbasins.shp` file contains an additional column, `area_m2`, which stores the area of each subbasin.
The `subbasin_drainage_points.shp` file contains an additional column, `flwacc`, which stores the flow accumulation value at the drainage points.
A summary file is created detailing the processing time and other relevant parameters. All outputs are saved to the specified folder.
Parameters
----------
dem_file : str
Path to the input DEM raster file.
outlet_type : str
Type of outlet from one of [single, multiple]. The 'single' option forces all flow directions
toward a single outlet at the lowest pit, while 'multiple' allows for multiple outlets.
tacc_type : str
Type of threshold for flow accumulation, chosen from ['percentage', 'absolute'].
The 'percentage' option uses the percent value of the maximum flow accumulation, while
'absolute' specifies a threshold based on a specific number of cells.
tacc_value : float
If 'percentage' is selected, this value must be between 0 and 100, representing the
percentage of maximum flow accumulation.
folder_path : str
Path to the output folder for saving files.
flw_col : str, optional
Name of the identifier column used in shapefiles to facilitate cross-referencing. Default is 'flw_id'.
Returns
-------
str
A confirmation message that all geoprocessing has been completed.
'''
# time at starting of the program
run_time = time.time()
# summary dictionary
summary: dict[str, typing.Any] = {}
# check validity of output folder path
if os.path.isdir(folder_path) is False:
raise Exception('Input folder path does not exsit.')
# check validty of outlet type
if outlet_type not in ['single', 'multiple']:
raise Exception('Outlet type must be one of [single, multiple].')
# check validity of threshold flow accumalation type
if tacc_type not in ['percentage', 'absolute']:
raise Exception('Threshold accumulation type must be one of [percentage, absolute].')
# DEM and mask array
start_time = time.time()
with rasterio.open(dem_file) as input_dem:
dem_shape = input_dem.shape
cell_area = input_dem.res[0] * input_dem.res[1]
dem_profile = input_dem.profile
dem_profile.update(
{
'dtype': 'float32'
}
)
dem_array = input_dem.read(1).astype('float32')
required_time = round(time.time() - start_time, 2)
print(
f'DEM reading time (seconds): {required_time}',
flush=True
)
summary['DEM reading time (seconds)'] = required_time
mask_array = (dem_array != dem_profile['nodata']).astype('int32')
valid_cells = int((mask_array != 0).sum())
summary['Number of valid DEM cells'] = valid_cells
summary['Cell resolution'] = input_dem.res
watershed_area = valid_cells * cell_area
summary['Watershed area (m^2)'] = watershed_area
# flow direction array and saving raster
start_time = time.time()
outlets = 'edge' if outlet_type == 'multiple' else 'min'
pitfill_array, flwdir_array = pyflwdir.dem.fill_depressions(
elevtn=dem_array,
outlets=outlets,
nodata=dem_profile['nodata']
)
flwdir_profile = dem_profile.copy()
flwdir_profile.update(
dtype=flwdir_array.dtype,
nodata=247
)
flwdir_file = os.path.join(folder_path, 'flwdir.tif')
with rasterio.open(flwdir_file, 'w', **flwdir_profile) as output_flwdir:
output_flwdir.write(flwdir_array, 1)
required_time = round(time.time() - start_time, 2)
print(
f'Pit filling and flow direction calculation time (seconds): {required_time}',
flush=True
)
summary['Pit filling and flow direction calculation time (seconds)'] = required_time
# slope array and saving raster
start_time = time.time()
slope_array = pyflwdir.dem.slope(
elevtn=pitfill_array.astype('float32'),
nodata=dem_profile['nodata'],
transform=dem_profile['transform']
)
slope_file = os.path.join(folder_path, 'slope.tif')
with rasterio.open(slope_file, 'w', **dem_profile) as output_slope:
output_slope.write(slope_array, 1)
required_time = round(time.time() - start_time, 2)
print(
f'Slope calculation time (seconds): {required_time}',
flush=True
)
summary['Slope calculation time (seconds)'] = required_time
# aspect array and saving raster
start_time = time.time()
grad_y, grad_x = numpy.gradient(
dem_array,
input_dem.res[1],
input_dem.res[0],
)
aspect_array = numpy.arctan2(-grad_y, grad_x) * (180 / numpy.pi)
aspect_array[aspect_array < 0] += 360
aspect_array[dem_array == dem_profile['nodata']] = dem_profile['nodata']
aspect_file = os.path.join(folder_path, 'aspect.tif')
with rasterio.open(aspect_file, 'w', **dem_profile) as output_aspect:
output_aspect.write(aspect_array, 1)
required_time = round(time.time() - start_time, 2)
print(
f'Aspect calculation time (seconds): {required_time}',
flush=True
)
summary['Aspect calculation time (seconds)'] = required_time
# flow accumulation array and saving raster
start_time = time.time()
flwdir_object = pyflwdir.from_array(
data=flwdir_array,
transform=dem_profile['transform']
)
flwacc_array = flwdir_object.accuflux(
data=mask_array
)
flwacc_array[mask_array == 0] = dem_profile['nodata']
flwacc_file = os.path.join(folder_path, 'flwacc.tif')
with rasterio.open(flwacc_file, 'w', **dem_profile) as output_flwacc:
output_flwacc.write(flwacc_array, 1)
required_time = round(time.time() - start_time, 2)
print(
f'Flow accumulation calculation time (seconds): {required_time}',
flush=True
)
summary['Flow accumulation calculation time (seconds)'] = required_time
# maximum flow accumulation
max_flwacc = int(flwacc_array[mask_array != 0].max())
summary['Maximum flow accumulation'] = max_flwacc
summary['Flow accumulation threshold type and value'] = (tacc_type, tacc_value)
threshold = int(max_flwacc * tacc_value / 100) if tacc_type == 'percentage' else tacc_value
summary['Stream generation from threshold cells'] = threshold
threshold_area = threshold * cell_area
summary['Stream generation from threshold area (m^2)'] = threshold_area
# flow features
start_time = time.time()
flwacc_features = flwdir_object.streams(
mask=flwacc_array >= threshold
)
feature_gdf = geopandas.GeoDataFrame.from_features(
features=flwacc_features,
crs=dem_profile['crs']
)
# flow line GeoDataFrame
flw_gdf = feature_gdf[feature_gdf['pit'] == 0].reset_index(drop=True)
flw_gdf[flw_col] = list(range(1, flw_gdf.shape[0] + 1))
flw_gdf = flw_gdf[[flw_col, 'geometry']]
flw_gdf.to_file(
filename=os.path.join(folder_path, 'stream_lines.shp')
)
required_time = round(time.time() - start_time, 2)
print(
f'Stream calculation time (seconds): {required_time}',
flush=True
)
summary['Stream calculation time (seconds)'] = required_time
summary['Number of stream segments'] = flw_gdf.shape[0]
# outlet point GeoDataFrame
outlet_gdf = feature_gdf[feature_gdf['pit'] == 1].reset_index(drop=True)
outlet_gdf['outlet_id'] = range(1, outlet_gdf.shape[0] + 1)
outlet_gdf.geometry = outlet_gdf.geometry.apply(lambda x: shapely.Point(x.coords[-1]))
outlet_gdf.to_file(
filename=os.path.join(folder_path, 'outlet_points.shp')
)
summary['Number of outlets'] = outlet_gdf.shape[0]
# subbaisn pour point GeoDataFrame
start_time = time.time()
pour_gdf = flw_gdf.copy()
pour_gdf['pour_coords'] = pour_gdf.geometry.apply(
lambda x: x.coords[-2]
)
pour_gdf['geometry'] = pour_gdf.apply(
lambda row: shapely.Point(row['pour_coords']),
axis=1
)
pour_gdf = pour_gdf.drop(
columns=['pour_coords']
)
pour_array = rasterio.features.rasterize(
shapes=zip(pour_gdf.geometry, pour_gdf[flw_col]),
out_shape=dem_shape,
transform=dem_profile['transform'],
all_touched=True,
fill=dem_profile['nodata'],
dtype=dem_profile['dtype']
)
pour_flwacc = {}
for pid in pour_gdf[flw_col]:
pid_flwacc = flwacc_array[pour_array == pid]
pour_flwacc[pid] = pid_flwacc[0]
pour_gdf['flwacc'] = pour_gdf[flw_col].apply(lambda x: pour_flwacc.get(x))
pour_gdf.to_file(
filename=os.path.join(folder_path, 'subbasin_drainage_points.shp')
)
# subbasins polygon GeoDataFrame
subbasin_array = flwdir_object.basins(
xy=(pour_gdf.geometry.x, pour_gdf.geometry.y),
ids=pour_gdf[flw_col].astype('uint32')
)
subbasin_shapes = rasterio.features.shapes(
source=subbasin_array.astype('int32'),
mask=subbasin_array != 0,
transform=dem_profile['transform'],
connectivity=8
)
subbasin_features = [
{'geometry': geom, 'properties': {flw_col: val}} for geom, val in subbasin_shapes
]
subbasin_gdf = geopandas.GeoDataFrame.from_features(
features=subbasin_features,
crs=dem_profile['crs']
)
subbasin_gdf.geometry = subbasin_gdf.geometry.make_valid()
subbasin_gdf = subbasin_gdf.sort_values(
by=[flw_col],
ascending=[True],
ignore_index=True
)
subbasin_gdf[flw_col] = subbasin_gdf[flw_col].astype(int)
subbasin_gdf['area_m2'] = subbasin_gdf.geometry.area
subbasin_gdf.to_file(
filename=os.path.join(folder_path, 'subbasins.shp'),
engine='pyogrio'
)
required_time = round(time.time() - start_time, 1)
print(
f'Subbasin calculation time (seconds): {required_time}',
flush=True
)
summary['Subbasin calculation time (seconds)'] = required_time
# total time to run the program
total_time = round(time.time() - run_time, 2)
print(
f'Total time (seconds): {total_time}',
flush=True
)
summary['Total time (seconds)'] = total_time
# saving summary
summary_file = os.path.join(folder_path, 'summary_swatplus_preliminary_files.json')
with open(summary_file, 'w') as output_summary:
json.dump(summary, output_summary, indent='\t')
return 'All geoprocessing has been completed.'
[docs]
def get_flwdir(
self,
dem_file: str,
outlet_type: str,
pitfill_file: str,
flwdir_file: str
) -> str:
'''
Compute flow direction after filling the pits of the DEM.
Parameters
----------
dem_file : str
Path to the input DEM raster file.
outlet_type : str
Type of outlet from one of [single, multiple]. The 'single' option forces all flow directions
toward a single outlet at the lowest pit, while 'multiple' allows for multiple outlets.
pitfill_file : str
Path to save the output pit-filled DEM raster file.
flwdir_file : str
Path to save the output flow direction raster file.
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# start time
start_time = time.time()
# check validity of output file path
for file in [pitfill_file, flwdir_file]:
check_file = Core().is_valid_raster_driver(file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check validty of outlet type
if outlet_type not in ['single', 'multiple']:
raise Exception('Outlet type must be one of [single, multiple].')
# pit filling and flow direction from the DEM
with rasterio.open(dem_file) as input_dem:
raster_profile = input_dem.profile
outlets = 'edge' if outlet_type == 'multiple' else 'min'
pitfill_array, flwdir_array = pyflwdir.dem.fill_depressions(
elevtn=input_dem.read(1).astype('float32'),
outlets=outlets,
nodata=raster_profile['nodata']
)
# saving pit filling raster
raster_profile.update(
{'dtype': 'float32'}
)
with rasterio.open(pitfill_file, 'w', **raster_profile) as output_pitfill:
output_pitfill.write(pitfill_array, 1)
# saving flow direction raster
raster_profile.update(
dtype=flwdir_array.dtype,
nodata=247
)
with rasterio.open(flwdir_file, 'w', **raster_profile) as output_flwdir:
output_flwdir.write(flwdir_array, 1)
# required time
required_time = time.time() - start_time
output = f'Time required for computing pit filling and flow direction: {required_time:.2f} seconds.'
return output
[docs]
def get_flwacc(
self,
flwdir_file: str,
flwacc_file: str
) -> str:
'''
Computes flow accumulation from flow direction rasters.
Parameters
----------
flwdir_file : str
Path of the input flow direction raster file.
flwacc_file : str
Path to save the output flow accumulation raster file.
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# start time
start_time = time.time()
# check validity of output file path
check_file = Core().is_valid_raster_driver(flwacc_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# flow direction object
with rasterio.open(flwdir_file) as input_flwdir:
raster_profile = input_flwdir.profile
flwdir_array = input_flwdir.read(1)
mask_array = (flwdir_array != input_flwdir.nodata).astype('int32')
flwdir_object = pyflwdir.from_array(
data=flwdir_array,
transform=raster_profile['transform']
)
# flow accumulation array
flwacc_array = flwdir_object.accuflux(
data=mask_array
)
max_flwacc = flwacc_array.max()
print(f'Maximum flow accumulation: {max_flwacc}.')
# saving flow accumulation raster
flwacc_array[mask_array == 0] = -9999
raster_profile.update(
{
'dtype': 'float32',
'nodata': -9999
}
)
with rasterio.open(flwacc_file, 'w', **raster_profile) as output_flwacc:
output_flwacc.write(flwacc_array, 1)
# required time
required_time = time.time() - start_time
output = f'Time required for computing flow accumulation: {required_time:.2f} seconds.'
return output
[docs]
def get_stream(
self,
flwdir_file: str,
flwacc_file: str,
tacc_type: str,
tacc_value: str,
stream_file: str,
outlet_file: str
) -> str:
'''
Generates stream network and main outlet GeoDataFrames from flow direction and accumulation.
Parameters
----------
flwdir_file : str
Path to the input flow direction raster file.
flwacc_file : str
Path to the input flow accumulation raster file.
tacc_type : str
Type of threshold for flow accumulation, chosen from ['percentage', 'absolute'].
The 'percentage' option uses the percent value of the maximum flow accumulation, while
'absolute' specifies a threshold based on a specific number of cells.
tacc_value : float
If 'percentage' is selected, this value must be between 0 and 100, representing the
percentage of maximum flow accumulation.
stream_file : str
Path to save the output stream shapefile.
outlet_file : str
Path to save the output outlet shapefile.
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# start time
start_time = time.time()
# check validity of output file path
for file in [stream_file, outlet_file]:
check_file = Core().is_valid_ogr_driver(file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check validity of flow accumulation thereshold type
if tacc_type not in ['percentage', 'absolute']:
raise Exception('Threshold accumulation type must be one of [percentage, absolute].')
# flow direction object
with rasterio.open(flwdir_file) as input_flwdir:
flwdir_object = pyflwdir.from_array(
data=input_flwdir.read(1),
transform=input_flwdir.transform
)
# flow accumulation array
with rasterio.open(flwacc_file) as input_flwacc:
raster_profile = input_flwacc.profile
flwacc_array = input_flwacc.read(1)
max_flwacc = flwacc_array[flwacc_array != input_flwacc.nodata].max()
# flow accumulation to stream path
acc_threshold = tacc_value if tacc_type == 'absolute' else round(max_flwacc * tacc_value / 100)
print(f'Threshold flow accumulation: {acc_threshold}.')
features = flwdir_object.streams(
mask=flwacc_array >= acc_threshold
)
gdf = geopandas.GeoDataFrame.from_features(
features=features,
crs=raster_profile['crs']
)
# saving stream GeoDataFrame
stream_gdf = gdf[gdf['pit'] == 0].reset_index(drop=True)
stream_gdf['SID'] = list(range(1, stream_gdf.shape[0] + 1))
stream_gdf.to_file(stream_file)
# saving outlet GeoDataFrame
outlet_gdf = gdf[gdf['pit'] == 1].reset_index(drop=True)
outlet_gdf['geometry'] = outlet_gdf['geometry'].apply(
lambda x: shapely.Point(*x.coords[-1])
)
outlet_gdf['OID'] = list(range(1, outlet_gdf.shape[0] + 1))
outlet_gdf.to_file(outlet_file)
# required time
required_time = time.time() - start_time
output = f'Time required for computing stream network and main outlets: {required_time:.2f} seconds.'
return output
[docs]
def get_subbasins(
self,
flwdir_file: str,
stream_file: str,
outlet_file: str,
subbasin_file: str,
pour_file: str
) -> str:
'''
Generates subbasins and their pour points from flow direction, stream and outlets.
Parameters
----------
flwdir_file : str
Path to the input flow direction raster file.
stream_file : str
Path of the input stream shapefile.
outlet_file : str
Path of the input outlet shapefile.
subbasin_file : str
Path to save the output subbasins shapefile.
pour_file : str
Path to save the output pour points shapefile.
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# start time
start_time = time.time()
# check validity of output file path
for file in [subbasin_file, pour_file]:
check_file = Core().is_valid_ogr_driver(file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# flow direction object
with rasterio.open(flwdir_file) as input_flwdir:
raster_profile = input_flwdir.profile
flowdir_object = pyflwdir.from_array(
data=input_flwdir.read(1),
transform=input_flwdir.transform
)
# stream GeoDataFrame
stream_gdf = geopandas.read_file(stream_file)
# subbasin pour points
pits = geopandas.read_file(outlet_file)['idx_ds'].values
pour_coords = stream_gdf.apply(
lambda row: row.geometry.coords[-1] if row['idx_ds'] in pits else row.geometry.coords[-2],
axis=1
)
pour_gdf = stream_gdf.copy()
pour_points = list(map(lambda x: shapely.Point(*x), pour_coords))
pour_gdf['geometry'] = pour_points
# saving subbasin pour points GeoDataFrame
pour_gdf.to_file(pour_file)
# subbasins
subbasin_array = flowdir_object.basins(
xy=(pour_gdf.geometry.x, pour_gdf.geometry.y),
ids=pour_gdf['SID'].astype('uint32')
)
subbasin_shapes = rasterio.features.shapes(
source=subbasin_array.astype('int32'),
mask=subbasin_array != 0,
transform=raster_profile['transform'],
connectivity=8
)
subbasin_features = [
{'geometry': geometry, 'properties': {'SID': value}} for geometry, value in subbasin_shapes
]
subbasin_gdf = geopandas.GeoDataFrame.from_features(
features=subbasin_features,
crs=raster_profile['crs']
)
# saving subbasins GeoDataFrame
subbasin_gdf.to_file(subbasin_file)
# required time
required_time = time.time() - start_time
output = f'Time required for computing subbasins and their pour points: {required_time:.2f} seconds.'
return output
[docs]
def get_aspect(
self,
dem_file: str,
aspect_file: str
) -> str:
'''
Computes the terrain aspect from a DEM without applying pit filling.
Parameters
----------
dem_file : str
Path to the input DEM raster file (e.g., GeoTIFF).
aspect_file : str
Path to save the output aspect raster file.
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# start time
start_time = time.time()
# check validity of output file path
check_file = Core().is_valid_raster_driver(aspect_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# aspect raster
with rasterio.open(dem_file) as input_dem:
dem_profile = input_dem.profile
dem_array = input_dem.read(1)
grad_y, grad_x = numpy.gradient(
dem_array,
input_dem.res[1],
input_dem.res[0],
)
# angles
aspect_array = numpy.arctan2(-grad_y, grad_x) * (180 / numpy.pi)
# negative angles to compass direction
aspect_array[aspect_array < 0] += 360
aspect_array[dem_array == dem_profile['nodata']] = dem_profile['nodata']
# update raster profile
dem_profile.update(
{
'dtype': 'float32'
}
)
# saving the raster
with rasterio.open(aspect_file, 'w', **dem_profile) as output_aspect:
output_aspect.write(aspect_array, 1)
# required time
required_time = time.time() - start_time
output = f'Time required for computing aspect: {required_time:.2f} seconds.'
return output
[docs]
def get_slope(
self,
dem_file: str,
slope_file: str
) -> str:
'''
Computes slope from the DEM without pit filling.
Parameters
----------
dem_file : str
Path to the input DEM raster file.
slope_file : str
Path to save the output slope raster file.
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# start time
start_time = time.time()
# check validity of output file path
check_file = Core().is_valid_raster_driver(slope_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# slope raster
with rasterio.open(dem_file) as input_dem:
raster_profile = input_dem.profile
slope_array = pyflwdir.dem.slope(
elevtn=input_dem.read(1).astype('float32'),
nodata=raster_profile['nodata'],
transform=raster_profile['transform']
)
# saving slope raster
raster_profile.update(
{'dtype': 'float32'}
)
with rasterio.open(slope_file, 'w', **raster_profile) as output_slope:
output_slope.write(slope_array, 1)
# required time
required_time = time.time() - start_time
output = f'Time required for computing slope: {required_time:.2f} seconds.'
return output
[docs]
def slope_classification(
self,
slope_file: str,
reclass_lb: list[int],
reclass_values: list[int],
reclass_file: str
) -> str:
'''
Multiplies the slope array by 100 and reclassifies the values based on the given intervals.
Parameters
----------
slope_file : str
Path of the input slope raster file.
reclass_lb : list
List of left bounds of intervals. For example, [0, 2, 5] would be treated as
three intervals: [0, 2), [2, 5), and [5, maximum slope).
reclass_values : list
List of reclassified slope values.
reclass_file : str
Raster file path to save the output reclassified slope.
.. note::
Recommended classifications for erosion risk:
====================== ===========================
Slope Percentage (%) Slope Type
====================== ===========================
< 2 % Flats
[2 - 8) % Gentle
[8 - 20) % Moderate
[20 - 40) % Steep
>= 40 % Very Steep
====================== ===========================
.. tip::
Recommended for standard classifications:
====================== ===========================
Slope Percentage (%) Slope Type
====================== ===========================
< 5 % Flat
[5 - 15) % Gentle
[15 - 30) % Moderate
[30 - 50) % Steep
[50 - 75) % Very Steep
>= 75 % Extremely Steep
====================== ===========================
Returns
-------
str
A message indicating the time required for all geoprocessing computations.
'''
# Start time
start_time = time.time()
# check validity of output file path
check_file = Core().is_valid_raster_driver(reclass_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check lengths of lowerbounds and reclass values
if len(reclass_values) != len(reclass_lb):
raise Exception('Both input lists must have the same length.')
# slope array
with rasterio.open(slope_file) as input_slope:
raster_profile = input_slope.profile
nodata = raster_profile['nodata']
slope_array = 100 * input_slope.read(1).astype(float)
slope_array[slope_array == nodata * 100] = numpy.nan
# slope reclassification
reclass_array = numpy.full_like(slope_array, numpy.nan)
for index, rc_val in enumerate(reclass_lb):
if rc_val == reclass_lb[-1]:
true_value = (slope_array >= rc_val) & ~numpy.isnan(slope_array)
else:
true_value = (slope_array >= rc_val) & (slope_array < reclass_lb[index + 1]) & ~numpy.isnan(slope_array)
reclass_array[true_value] = reclass_values[index]
reclass_array[numpy.isnan(reclass_array)] = nodata
# saving reclassified slope raster
raster_profile.update(
{
'dtype': 'int32'
}
)
with rasterio.open(reclass_file, 'w', **raster_profile) as output_reclass:
output_reclass.write(reclass_array.astype('int32'), 1)
# required time
required_time = time.time() - start_time
return f'Time required for computing slope: {required_time:.2f} seconds.'