import geopandas
import shapely
import random
import pandas
import typing
import tempfile
import os
import json
from .core import Core
[docs]
class Stream:
'''
Provides functionality for stream path operations.
'''
[docs]
def flw_path_us2ds_check(
self,
stream_file: str
) -> bool:
'''
Checks the flow path direction from upstream to downstream
by comparing the number of segments in the flow path
to the number of their most upstream points.
Parameters
----------
stream_file : str
Path to the input stream shapefile.
Returns
-------
bool
True if the number of flow path segments aligns with
the number of upstream points, indicating correct
flow direction; otherwise, False.
'''
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(stream_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream GeoDataFrame
gdf = geopandas.read_file(stream_file)
gdf = gdf.explode(
index_parts=False,
ignore_index=True
)
# upstream points
upstream_points = set(gdf.geometry.apply(lambda x: x.coords[0]))
# check flow direction
output = True if len(gdf) == len(upstream_points) else False
return output
[docs]
def flw_path_reverse(
self,
input_file: str,
output_file: str
) -> geopandas.GeoDataFrame:
'''
Reverses the coordinate order for each segment in the input flow path,
ensuring that the starting point of each segment becomes its most upstream point.
Parameters
----------
input_file : str
Path to the input stream shapefile.
output_file : str
Path to save the output stream shapefile.
Returns
-------
GeoDataFrame
A GeoDataFrame with each stream segment’s coordinates reversed.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# input stream GeoDataFrame
gdf = geopandas.read_file(input_file)
tmp_col = Core()._tmp_df_column_name(list(gdf.columns))
gdf = gdf.reset_index(names=[tmp_col])
gdf = gdf.explode(index_parts=False, ignore_index=True)
# reversed stream coordinates order
gdf.geometry = gdf.geometry.apply(
lambda x: shapely.LineString(x.coords[::-1])
)
upstream_points = len(
set(
gdf.geometry.apply(lambda x: x.coords[0])
)
)
output = f'Flow segments: {len(gdf)}, upstream points: {upstream_points} after splitting MultiLineString(s), if present.'
gdf = gdf.dissolve(by=[tmp_col]).reset_index(drop=True)
# saving output GeoDataFrame
gdf.to_file(output_file)
return output
def _connectivity_adjacent_downstream_segment(
self,
input_file: str,
stream_col: str,
link_col: str,
unlinked_id: int
) -> geopandas.GeoDataFrame:
# stream geodataframe
stream_gdf = geopandas.read_file(input_file)
# endpoints of flow segments
upstream_points = {
idx: line.coords[0] for idx, line in zip(stream_gdf[stream_col], stream_gdf.geometry)
}
downstream_points = {
idx: line.coords[-1] for idx, line in zip(stream_gdf[stream_col], stream_gdf.geometry)
}
# downstream segment identifiers
downstream_link = {}
for dp_id in downstream_points.keys():
up_link = list(
filter(
lambda up_id: upstream_points[up_id] == downstream_points[dp_id], upstream_points
)
)
if len(up_link) == 1:
downstream_link[dp_id] = up_link[0]
else:
downstream_link[dp_id] = unlinked_id
# saving output GeoDataFrame
stream_gdf[link_col] = downstream_link.values()
return stream_gdf
[docs]
def connectivity_adjacent_downstream_segment(
self,
input_file: str,
stream_col: str,
output_file: str,
link_col: str = 'ds_id',
unlinked_id: int = -1
) -> geopandas.GeoDataFrame:
'''
Identifies the adjacent connected downstream identifier
for each segment in a stream network shapefile.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
output_file : str
Path to save the output stream shapefile
with adjacent downstream connectivity information.
link_col : str, optional
Name of the column to store the connected
downstream segment identifiers. Default is 'ds_id'.
unlinked_id : int, optional
Value to assign when a downstream segment identifier
is not found. Default is -1.
Returns
-------
GeoDataFrame
A GeoDataFrame representing the input shapefile,
enhanced with an additional column that indicates
the downstream segment identifier for each feature.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# saving output GepDataFrame
stream_gdf = self._connectivity_adjacent_downstream_segment(
input_file=input_file,
stream_col=stream_col,
link_col=link_col,
unlinked_id=unlinked_id
)
stream_gdf.to_file(output_file)
return stream_gdf
def _connectivity_adjacent_upstream_segment(
self,
stream_file: str,
stream_col: str,
link_col: str,
unlinked_id: int
) -> pandas.DataFrame:
# connectivity to downstream segment identifiers
stream_gdf = self._connectivity_adjacent_downstream_segment(
input_file=stream_file,
stream_col=stream_col,
link_col='ds_id',
unlinked_id=-1
)
# non headwater segments by exchanging stream and adjacent donwstream columns
nhw_df = stream_gdf[['ds_id', stream_col]]
nhw_df = nhw_df[~nhw_df['ds_id'].isin([-1])].reset_index(drop=True)
nhw_df.columns = [stream_col, link_col]
# predict headwater segments
hw_list = [
i for i in stream_gdf[stream_col] if i not in nhw_df[stream_col].tolist()
]
hw_df = pandas.DataFrame({stream_col: hw_list})
hw_df[link_col] = unlinked_id
# adjance upstream segements
ul_df = pandas.concat(
objs=[nhw_df, hw_df],
ignore_index=True
)
ul_df = ul_df.sort_values(
by=[stream_col, link_col],
ignore_index=True
)
return ul_df
[docs]
def connectivity_adjacent_upstream_segment(
self,
stream_file: str,
stream_col: str,
csv_file: str,
link_col: str = 'us_id',
unlinked_id: int = -1
) -> pandas.DataFrame:
'''
Identifies the adjacent connected upstream identifiers
for each segment in a stream network shapefile.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
csv_file : str
Path to save the output CSV file
with adjacent upstream connectivity information.
link_col : str, optional
Name of the column to store the connected
upstream segment identifiers. Default is 'us_id'.
unlinked_id : int, optional
Value to assign when an upstream segment identifier
is not found. Default is -1.
Returns
-------
DataFrame
A DataFrame with two columns `stream_col` and `link_col`.
The `stream_col` contains stream segment identifiers
(which may appear multiple times), and the `link_col` contains
their corresponding connected adjacent upstream segment identifiers.
'''
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(stream_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# saving adjacent upstream connectivity
ul_df = self._connectivity_adjacent_upstream_segment(
stream_file=stream_file,
stream_col=stream_col,
link_col=link_col,
unlinked_id=unlinked_id
)
ul_df.to_csv(
path_or_buf=csv_file,
sep='\t',
index=False
)
return ul_df
def _connectivity_upstream_to_downstream(
self,
stream_file: str,
stream_col: str
) -> dict[float, list[float]]:
# connectivity to downstream segment identifiers
stream_gdf = self._connectivity_adjacent_downstream_segment(
input_file=stream_file,
stream_col=stream_col,
link_col='ds_id',
unlinked_id=-1
)
stream_df = stream_gdf[[stream_col, 'ds_id']]
stream_link = dict(
(i, i) if j == -1 else (i, j) for i, j in zip(stream_df[stream_col], stream_df['ds_id'])
)
# upstream to downstream total connectivity
us2ds_link: dict[float, list[float]] = {
i: list() for i in stream_link.keys()
}
for i in stream_link:
fix_i = i
while True:
if stream_link[i] in us2ds_link[fix_i]:
break
else:
us2ds_link[fix_i].append(stream_link[i])
i = stream_link[i]
return us2ds_link
[docs]
def connectivity_upstream_to_downstream(
self,
stream_file: str,
stream_col: str,
json_file: str
) -> dict[float, list[float]]:
'''
Identifies all consecutively connected downstream segment identifiers
up to the outlet point for each segment in a stream network shapefile.
Parameters
----------
stream_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
json_file : str
Path to save the output JSON file
representing upstream to downstream connections.
Returns
-------
dict
A dictionary where each key is a stream segment identifier,
and the corresponding value is a list of all consecutively
connected downstream identifiers, ending at the outlet.
If no connected downstream segment is found, the value list
contains the segment identifier itself.
'''
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(stream_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# saving upstream to downstream connectivity
us2ds_link = self._connectivity_upstream_to_downstream(
stream_file=stream_file,
stream_col=stream_col
)
with open(json_file, 'w') as output_us2ds:
json.dump(us2ds_link, output_us2ds)
return us2ds_link
def _connectivity_downstream_to_upstream(
self,
stream_file: str,
stream_col: str
) -> dict[float, list[list[float]]]:
# connectivity to downstream segment identifiers
stream_gdf = self._connectivity_adjacent_downstream_segment(
input_file=stream_file,
stream_col=stream_col,
link_col='ds_id',
unlinked_id=-1
)
stream_df = stream_gdf[[stream_col, 'ds_id']]
stream_link = {
i: j for i, j in zip(stream_df[stream_col], stream_df['ds_id'])
}
# downstream to upstream total connectivity
ds2us_link: dict[float, list[list[float]]] = {
i: list() for i in stream_link.keys()
}
for i in stream_link.keys():
if i not in stream_link.values():
pass
else:
i_connect = [i]
while True:
i_upstream = list(
filter(
lambda x: stream_link[x] in i_connect, stream_link
)
)
if len(i_upstream) == 0:
break
else:
ds2us_link[i].append(i_upstream)
i_connect = ds2us_link[i][-1]
return ds2us_link
[docs]
def connectivity_downstream_to_upstream(
self,
stream_file: str,
stream_col: str,
json_file: str
) -> dict[float, list[list[float]]]:
'''
Identifies the connected upstream structure for each segment
in a stream network shapefile, tracing all upstream paths until
reaching a headwater segment.
Parameters
----------
stream_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
json_file : str
Path to save the output JSON file
representing downstream to upstream connections.
Returns
-------
dict
A dictionary where each key is a stream segment identifier,
and the corresponding value is a list of lists, each representing
a unique upstream path ending at a headwater segment.
'''
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(stream_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# saving downstream to upstream connectivitye
ds2us_link = self._connectivity_downstream_to_upstream(
stream_file=stream_file,
stream_col=stream_col
)
with open(json_file, 'w') as output_ds2us:
json.dump(ds2us_link, output_ds2us)
return ds2us_link
def _connectivity_to_all_upstream_segments(
self,
stream_file: str,
stream_col: str,
link_col: str,
unlinked_id: int
) -> pandas.DataFrame:
# downstream to upstream total connectivity
ds2us_link = self._connectivity_downstream_to_upstream(
stream_file=stream_file,
stream_col=stream_col
)
# adjacent upstream segment
up_link = []
for key, value in ds2us_link.items():
value_list = [j for i in value for j in i]
if len(value_list) == 0:
up_link.append({stream_col: key, link_col: unlinked_id})
else:
for ul in value_list:
up_link.append({stream_col: key, link_col: ul})
# converting list to DataFrame
ul_df = pandas.DataFrame(up_link)
ul_df = ul_df.sort_values(
by=[stream_col, link_col],
ignore_index=True
)
return ul_df
[docs]
def connectivity_to_all_upstream_segments(
self,
stream_file: str,
stream_col: str,
csv_file: str,
link_col: str = 'us_id',
unlinked_id: int = -1
) -> pandas.DataFrame:
'''
Converts the dictionary output from the
:meth:`GeoAnalyze.Stream.conncetivity_downstream_to_upstream` method
into a DataFrame with two columns: `stream_col` and `link_col`,
representing stream segment identifiers (which may appear multiple times)
and their corresponding consecutively connected upstream segments
ending at a headwater segment, respectively.
Parameters
----------
stream_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
csv_file : str
Path to save the output JSON file
representing downstream to upstream connections.
link_col : str, optional
Name of the column to store the connected
upstream segment identifiers. Default is 'us_id'.
unlinked_id : int, optional
Value to assign when a upstream segment identifier is not found. Default is -1.
Returns
-------
DataFrame
A DataFrame with two columns `stream_col` and `link_col`.
The `stream_col` contains stream segment identifiers (which may appear multiple times),
and the `link_col` contains their corresponding consecutively connected
upstream segment identifiers ending at a headwater segment.
'''
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(stream_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# saving adjacent upstream connectivity
ul_df = self._connectivity_to_all_upstream_segments(
stream_file=stream_file,
stream_col=stream_col,
link_col=link_col,
unlinked_id=unlinked_id
)
ul_df.to_csv(
path_or_buf=csv_file,
sep='\t',
index=False
)
return ul_df
[docs]
def connectivity_remove_to_headwater(
self,
input_file: str,
stream_col: str,
remove_segments: list[float],
output_file: str
) -> geopandas.GeoDataFrame:
'''
Removes targeted stream segments and all their upstream connections
up to headwaters in a stream network shapefile.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
remove_segments : list
A list of stream segment identifiers to remove, along with
all their upstream connections up to the headwaters.
output_file : str
Path to save the output stream shapefile after removing the
specified segments and their upstream connections.
Returns
-------
GeoDataFrame
A GeoDataFrame representing the updated stream network after
removing the targeted stream segments and their upstream paths.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream geodataframe
stream_gdf = geopandas.read_file(input_file)
if len(remove_segments) == 0:
pass
else:
# temporary directory
with tempfile.TemporaryDirectory() as tmp_dir:
# downstream to upstream total connectivity
ds2us_link = self.connectivity_downstream_to_upstream(
stream_file=input_file,
stream_col=stream_col,
json_file=os.path.join(tmp_dir, 'stream_ds2us.json')
)
# collecting targeted remove ids and their upstream connectivity
remove_ids: list[float] = []
for i in remove_segments:
remove_ids = remove_ids + [i]
if len(ds2us_link[i]) == 0:
pass
else:
i_ul = [k for j in ds2us_link[i] for k in j]
remove_ids = remove_ids + i_ul
# saving output GeoDataFrame
stream_gdf = stream_gdf[~stream_gdf[stream_col].isin(set(remove_ids))].reset_index(drop=True)
stream_gdf.to_file(output_file)
return stream_gdf
[docs]
def connectivity_merge_of_split_segments(
self,
input_file: str,
stream_col: str,
output_file: str,
json_file: str
) -> geopandas.GeoDataFrame:
'''
Merges split segments in the stream network, if any, either between
two junction points or from a junction point upstream until a headwater occurs.
The merged segment is assigned the identifier of the most downstream segment
among those being merged.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
output_file : str
Path to save the output stream shapefile
with updated downstream connectivity information.
json_file : str
Path to save the output JSON file representing the merge information
of stream segments. For example, {5: [4, 39, 38, 2]} indicates that
stream segment 5 is the result of merging segments 4, 39, 38, and 2,
which are consecutively connected from downstream to upstream until
a junction point is reached or no upstream segment exists.
Returns
-------
GeoDataFrame
A GeoDataFrame where each merged segment is represented by the most
downstream segment identifier, absorbing all connected upstream segments
either between junction points or from a junction point to a headwater.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# temporary directory
with tempfile.TemporaryDirectory() as tmp_dir:
# connectivity to downstream segment identifiers
stream_gdf = self.connectivity_adjacent_downstream_segment(
input_file=input_file,
stream_col=stream_col,
output_file=os.path.join(tmp_dir, 'stream_downstream_id.shp')
)
slm_gdf = stream_gdf[[stream_col, 'ds_id', 'geometry']]
stream_link = dict(
zip(slm_gdf[stream_col], slm_gdf['ds_id'])
)
# upstream link until either junction point or headwater occurs
upstream_link: dict[float, list[list[float]]] = {
i: list() for i in slm_gdf[stream_col]
}
for i in stream_link.keys():
if i not in stream_link.values():
pass
else:
i_connect = [i]
while True:
i_upstream = list(
filter(
lambda x: stream_link[x] in i_connect, stream_link
)
)
if len(i_upstream) == 0:
break
elif len(i_upstream) > 1:
break
else:
upstream_link[i].append(i_upstream)
i_connect = upstream_link[i][-1]
# non-empty upstream link until either junction point or headwater occurs
jh_link = {
i: j for i, j in upstream_link.items() if len(j) > 0
}
# end segment identifiers until either junction point or headwater occurs
jh_ids = set([jh_link[i][-1][0] for i in jh_link])
# select most downstream segments for merged links
ds_link = {}
for i in jh_ids:
i_jh = list(
filter(
lambda x: jh_link[x][-1][0] == i, jh_link.keys()
)
)
i_length = list(
map(
lambda x: len(jh_link[x]), i_jh
)
)
i_select = i_jh[i_length.index(max(i_length))]
ds_link[i_select] = jh_link[i_select]
# dictionary of merged link
merged_link: dict[float, list[float]] = dict(
zip(
ds_link.keys(), map(lambda x: [i[0] for i in ds_link[x]], ds_link.keys())
)
)
# saving merged information of split segments in json file
with open(json_file, 'w') as output_merged:
json.dump(merged_link, output_merged)
# saving output GeoDataFrame
slm_gdf = slm_gdf.drop(columns=['ds_id'])
reverse_merge = {
val: key for key, values in merged_link.items() for val in values
}
slm_gdf['m_id'] = slm_gdf[stream_col].apply(
lambda x: reverse_merge.get(x, x)
)
slm_gdf = slm_gdf.dissolve(by=['m_id']).reset_index()
slm_gdf['geometry'] = slm_gdf['geometry'].apply(
lambda x: shapely.line_merge(x)
)
slm_gdf = slm_gdf.drop(columns=[stream_col])
slm_gdf = slm_gdf.rename(
columns={'m_id': stream_col}
)
slm_gdf.to_file(output_file)
return slm_gdf
[docs]
def point_junctions(
self,
input_file: str,
stream_col: str,
output_file: str,
junction_col: str = 'j_id'
) -> geopandas.GeoDataFrame:
'''
Identifies junction points in the stream path and maps stream segment identifiers
whose most downstream points coincide with these junction points. Additionally,
a new column 'j_id' will be added to assign a unique identifier to each junction point, starting from 1.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing a unique identifier for each stream segment.
output_file : str
Path to save the output junction point shapefile.
junction_col : str, optional
Name of the column to store the connected downstream segment identifiers. Default is 'j_id'.
Returns
-------
GeoDataFrame
A GeoDataFrame of junction points with their corresponding stream segment identifiers.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream geodataframe
stream_gdf = geopandas.read_file(input_file)
# downstream endpoint GeoDataFrame
downstream_points = stream_gdf.geometry.apply(lambda x: shapely.Point(x.coords[-1]))
downstream_gdf = geopandas.GeoDataFrame(
{
stream_col: stream_gdf[stream_col],
'geometry': downstream_points
},
crs=stream_gdf.crs
)
# junction point GeoDataFrame
downstream_counts = downstream_gdf['geometry'].value_counts()
junction_points = downstream_counts[downstream_counts > 1].index
junction_gdf = downstream_gdf[downstream_gdf['geometry'].isin(junction_points.tolist())]
# get the segment identfiers of junction points
junction_groups = junction_gdf.groupby('geometry')[stream_col].apply(lambda x: x.tolist())
# saving output GeoDataFrame
output_gdf = geopandas.GeoDataFrame(
data={
junction_col: range(1, len(junction_groups) + 1),
junction_groups.name: junction_groups.values
},
geometry=list(junction_groups.index),
crs=stream_gdf.crs
)
output_gdf.to_file(output_file)
return output_gdf
[docs]
def point_segment_subbasin_drainage(
self,
input_file: str,
output_file: str
) -> geopandas.GeoDataFrame:
'''
Generates a GeoDataFrame of subbasin drainage points for flow segments in the stream path.
For each flow segment, the most downstream point is selected unless it is a junction point,
in which case the second most downstream point is used.
Parameters
----------
input_file : str
Path to the input stream shapefile.
output_file : str
Path to save the output pour point shapefile.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the subbasin drainage points.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream GeoDataFrame
stream_gdf = geopandas.read_file(input_file)
# junction points
downstream_points = stream_gdf.geometry.apply(lambda x: shapely.Point(x.coords[-1]))
point_count = downstream_points.value_counts()
junction_points = point_count[point_count > 1].index.to_list()
# subbasin drainage points
pour_gdf = stream_gdf.copy()
pour_gdf['junction'] = pour_gdf['geometry'].apply(
lambda x: 'YES' if shapely.Point(*x.coords[-1]) in junction_points else 'NO'
)
pour_gdf['pour_coords'] = pour_gdf.apply(
lambda row: row.geometry.coords[-2] if row['junction'] == 'YES' else row.geometry.coords[-1],
axis=1
)
pour_gdf['geometry'] = pour_gdf.apply(
lambda row: shapely.Point(*row['pour_coords']),
axis=1
)
pour_gdf = pour_gdf.drop(columns=['pour_coords', 'junction'])
# saving output GeoDataFrame
pour_gdf.to_file(output_file)
return pour_gdf
[docs]
def point_main_outlets(
self,
input_file: str,
output_file: str
) -> geopandas.GeoDataFrame:
'''
Identifies the main outlet points of a stream path and
saves the resulting GeoDataFrame to the specified shapefile path.
Parameters
----------
input_file : str
Path to the input stream shapefile.
output_file : str
Path to save the output outlet point shapefile.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the main outlet points along
with their associated flow segment identifiers.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream geodataframe
stream_gdf = geopandas.read_file(input_file)
# outlet point GeoDataFrame
downstream_gdf = stream_gdf.copy()
downstream_gdf['geometry'] = stream_gdf.geometry.apply(lambda x: shapely.Point(*x.coords[-1]))
downstream_counts = downstream_gdf['geometry'].value_counts()
outlet_points = downstream_counts[downstream_counts == 1].index
outlet_gdf = downstream_gdf[downstream_gdf['geometry'].isin(outlet_points.tolist())]
outlet_gdf = outlet_gdf.reset_index(drop=True)
# saving output GeoDataFrame
outlet_gdf.to_file(output_file)
return outlet_gdf
[docs]
def point_headwaters(
self,
input_file: str,
stream_col: str,
output_file: str,
) -> geopandas.GeoDataFrame:
'''
Identifies headwater points in the stream network. A headwater point
is defined as the starting point of a stream segment with no upstream connections.
Parameters
----------
input_file : str
Path to the input stream shapefile.
output_file : str
Path to save the output shapefile containing identified headwater points.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the geometries and attributes of headwater points.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# temporary directory
with tempfile.TemporaryDirectory() as tmp_dir:
# connectivity to downstream segment identifiers
stream_gdf = self.connectivity_adjacent_downstream_segment(
input_file=input_file,
stream_col=stream_col,
output_file=os.path.join(tmp_dir, 'stream_downstream_id.shp')
)
# predict headwater segments
hw_ids = [
i for i in stream_gdf[stream_col] if i not in stream_gdf['ds_id'].tolist()
]
hw_gdf = stream_gdf[stream_gdf[stream_col].isin(hw_ids)].reset_index(drop=True)
hw_gdf.geometry = hw_gdf.geometry.apply(lambda x: shapely.Point(x.coords[0]))
# saving output GeoDataFrame
hw_gdf.to_file(output_file)
return hw_gdf
[docs]
def order_strahler(
self,
input_file: str,
stream_col: str,
output_file: str,
order_col: str = 'strahler'
) -> geopandas.GeoDataFrame:
'''
Computes the Strahler order for each segment
in a stream network shapefile.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
output_file : str
Path to save the output stream shapefile
with Strahler stream order information.
order_col : str, optional
Name of the column to store the Strahler order
of stream segments. Default is 'strahler'.
Returns
-------
GeoDataFrame
A GeoDataFrame representing the input shapefile,
enhanced with an additional column that indicates
the Strahler stream order for each stream segment.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream GeoDataFrame
stream_gdf = geopandas.read_file(input_file)
# connectivity to upstream segment identifiers
ul_df = self._connectivity_adjacent_upstream_segment(
stream_file=input_file,
stream_col=stream_col,
link_col='us_id',
unlinked_id=-1
)
ul_dict = {
key: df['us_id'].tolist() for key, df in ul_df.groupby(stream_col)
}
ul_ids = {
key: [] if value == [-1] else value for key, value in ul_dict.items()
}
# intial count dictionary for Strahler stream order
ul_count = {
key: len(value) for key, value in ul_ids.items()
}
# compute Strahler order
strahler_order = ul_count.copy()
for i in stream_gdf[stream_col]:
# if no upstream link
if ul_count[i] == 0:
strahler_order[i] = 1
else:
# update strahler_order order if upstream link is present
update_order = 0
update_count = 0
for j in ul_ids[i]:
if strahler_order[j] > update_order:
update_order = strahler_order[j]
update_count = 1
elif strahler_order[j] == update_order:
update_count = update_count + 1
else:
continue
if update_count > 1:
strahler_order[i] = update_order + 1
else:
strahler_order[i] = update_order
# insert Strahler order into the stream GeoDataFrame
stream_gdf[order_col] = stream_gdf[stream_col].apply(
lambda x: strahler_order.get(x)
)
# saving output GeoDataFrame
stream_gdf.to_file(output_file)
return stream_gdf
[docs]
def order_shreve(
self,
input_file: str,
stream_col: str,
output_file: str,
order_col: str = 'shreve'
) -> geopandas.GeoDataFrame:
'''
Computes the Shreve order for each segment
in a stream network shapefile.
Parameters
----------
input_file : str
Path to the input stream shapefile.
stream_col : str
Column name in the stream shapefile containing
a unique identifier for each stream segment.
output_file : str
Path to save the output stream shapefile
with Shreve stream order information.
order_col : str, optional
Name of the column to store the Shreve order
of stream segments. Default is 'shreve'.
Returns
-------
GeoDataFrame
A GeoDataFrame representing the input shapefile,
enhanced with an additional column that indicates
the Shreve stream order for each stream segment.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# check LineString geometry type
if 'LineString' not in Core().shapefile_geometry_type(input_file):
raise Exception('Input shapefile must have geometries of type LineString.')
# stream GeoDataFrame
stream_gdf = geopandas.read_file(input_file)
# connectivity to upstream segment identifiers
ul_df = self._connectivity_adjacent_upstream_segment(
stream_file=input_file,
stream_col=stream_col,
link_col='us_id',
unlinked_id=-1
)
ul_dict = {
key: df['us_id'].tolist() for key, df in ul_df.groupby(stream_col)
}
ul_ids = {
key: [] if value == [-1] else value for key, value in ul_dict.items()
}
# initialize all segments with 0 Shreve order
shreve_order = {
i: 0 for i in stream_gdf[stream_col]
}
# find segments with no upstream link and set Shreve order to 1
for i in shreve_order:
if len(ul_ids[i]) == 0:
shreve_order[i] = 1
# iterate until Shreve order of all segments are updated
stream_ids = stream_gdf[stream_col].tolist()
while len(stream_ids) > 0:
for i in stream_ids:
# get upstream link of stream segement i
i_ul = ul_ids[i]
# check if all upstream segments have Shreve order greater than 0
if all(shreve_order[j] > 0 for j in i_ul):
# no change of Shreve order if upstream link is not found
if len(i_ul) == 0:
pass
# add Shreve orders of upstream links
else:
shreve_order[i] = sum(shreve_order[j] for j in i_ul)
stream_ids.remove(i)
# insert Shreve order into the stream GeoDataFrame
stream_gdf[order_col] = stream_gdf[stream_col].apply(
lambda x: shreve_order.get(x)
)
# saving output GeoDataFrame
stream_gdf.to_file(output_file)
return stream_gdf
[docs]
def box_touch_selected_segment(
self,
input_file: str,
column_name: str,
column_value: typing.Any,
box_length: float,
output_file: str
) -> geopandas.GeoDataFrame:
'''
Creates a square box polygon that touches a specified segment
in the stream path at a randomly chosen point along the segment.
Parameters
----------
input_file : str
Path to the input stream shapefile.
column_name : str
Name of the column used for selecting the target stream segment.
column_value : Any
Value in the specified column that identifies the target stream segment.
box_length : float
Length of each side of the square box polygon.
output_file : str
Path to save the output box polygon shapefile.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the created box polygon, which
touches the specified stream segment at a random point.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# input line segment
gdf = geopandas.read_file(input_file)
line = gdf[gdf[column_name].isin([column_value])].geometry.iloc[0]
# line coords
line_coords = line.coords[:] if isinstance(line, shapely.LineString) else [c for ls in line.geoms for c in ls.coords[:]]
while True:
# choose points
point_index = random.randint(
a=0,
b=len(line_coords) - 1
)
point = shapely.Point(line.coords[point_index])
# create box
box = shapely.box(
xmin=point.x,
ymin=point.y,
xmax=point.x + box_length,
ymax=point.y + box_length
)
# random angle between 0 and 360
rotate_box = shapely.affinity.rotate(
geom=box,
angle=random.randint(0, 360),
origin=point
)
check_touch = line.touches(rotate_box) and not line.crosses(rotate_box)
if check_touch is True:
break
# saving output GeoDataFrame
box_gdf = geopandas.GeoDataFrame(
geometry=[rotate_box],
crs=gdf.crs
)
box_gdf.to_file(output_file)
return box_gdf
[docs]
def box_touch_selected_segment_at_endpoint(
self,
input_file: str,
column_name: str,
column_value: typing.Any,
box_length: float,
output_file: str,
upstream_point: bool = True
) -> geopandas.GeoDataFrame:
'''
Creates a square box polygon that touches an endpoint
of a specified segment in the input stream path.
Parameters
----------
input_file : str
Path to the input stream shapefile.
column_name : str
Name of the column used for selecting the target stream segment.
column_value : Any
Value in the specified column that identifies the target stream segment.
box_length : float
Length of each side of the square box polygon.
output_file : str
Path to save the output box polygon shapefile.
upstream_point : bool, optional
If True, the box is positioned to pass through the upstream endpoint
of the segment; if False, it passes through the downstream endpoint. Default is True.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the box polygon, which touches an endpoint of
the specified segment in the input stream path.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# input line segement
gdf = geopandas.read_file(input_file)
line = gdf[gdf[column_name].isin([column_value])].geometry.iloc[0]
# get point
point_coords = line.coords[0] if upstream_point is True else line.coords[-1]
point = shapely.Point(*point_coords)
# create box
box = shapely.box(
xmin=point.x,
ymin=point.y,
xmax=point.x + box_length,
ymax=point.y + box_length
)
# check whether the box touches the line; otherwise rotate
while True:
check_touch = line.touches(box) and not line.crosses(box)
if check_touch:
break
else:
box = shapely.affinity.rotate(
geom=box,
angle=random.randint(0, 360),
origin=point
)
# saving output GeoDataFrame
box_gdf = geopandas.GeoDataFrame(
geometry=[box],
crs=gdf.crs
)
box_gdf.to_file(output_file)
return box_gdf
[docs]
def box_cross_selected_segment_at_endpoint(
self,
input_file: str,
column_name: str,
column_value: typing.Any,
box_length: float,
output_file: str,
downstream_point: bool = True
) -> geopandas.GeoDataFrame:
'''
Creates a square box polygon that crosses a specified segment
in the stream path and passes through an endpoint of the segment.
Parameters
----------
input_file : str
Path to the input stream shapefile.
column_name : str
Name of the column used for selecting the target stream segment.
column_value : Any
Value in the specified column that identifies the target stream segment.
box_length : float
Length of each side of the square box polygon.
output_file : str
Path to save the output box polygon shapefile.
downstream_point : bool, optional
If True, the box is positioned to pass through the downstream endpoint
of the segment; if False, it passes through the upstream endpoint. Default is True.
Returns
-------
GeoDataFrame
A GeoDataFrame containing the box polygon, which crosses the
specified stream segment and passes through an endpoint of the segment.
'''
# check validity of output file path
check_file = Core().is_valid_ogr_driver(output_file)
if check_file is False:
raise Exception('Could not retrieve driver from the file path.')
# input line segement
gdf = geopandas.read_file(input_file)
line = gdf[gdf[column_name].isin([column_value])].geometry.iloc[0]
# get point
point_coords = line.coords[-1] if downstream_point is True else line.coords[0]
point = shapely.Point(*point_coords)
# create box
box = shapely.box(
xmin=point.x,
ymin=point.y,
xmax=point.x + box_length,
ymax=point.y + box_length
)
# check whether the box crosses the line; otherwise rotate
while True:
if line.crosses(box):
break
else:
box = shapely.affinity.rotate(
geom=box,
angle=random.randint(0, 360),
origin=point
)
# saving output GeoDataFrame
box_gdf = geopandas.GeoDataFrame(
geometry=[box],
crs=gdf.crs
)
box_gdf.to_file(output_file)
return box_gdf