"""DEM collection class and functions."""
from __future__ import annotations
import datetime
import warnings
import geoutils as gu
import numpy as np
import pandas as pd
import xdem
from xdem._typing import NDArrayf
[docs]
class DEMCollection:
"""A temporal collection of DEMs."""
[docs]
def __init__(
self,
dems: list[gu.Raster] | list[xdem.DEM],
timestamps: list[datetime.datetime] | None = None,
outlines: gu.Vector | dict[datetime.datetime, gu.Vector] | None = None,
reference_dem: int | gu.Raster = 0,
):
"""
Create a new temporal DEM collection.
:param dems: A list of DEMs.
:param timestamps: A list of DEM timestamps.
:param outlines: Polygons to separate the changing area of interest. Could for example be glacier outlines.
:param reference_dem: An instance or index of which DEM in the 'dems' list is the reference.
:returns: A new DEMCollection instance.
"""
# If timestamps is not given, try to parse it from the (potential) 'datetime' attribute of each DEM.
if timestamps is None:
timestamp_attributes = [dem.datetime for dem in dems]
if any(stamp is None for stamp in timestamp_attributes):
raise ValueError("'timestamps' not provided and the given DEMs do not all have datetime attributes")
timestamps = timestamp_attributes
if not all(isinstance(dem, xdem.DEM) for dem in dems):
dems = [xdem.DEM.from_array(dem.data, dem.transform, dem.crs, dem.nodata) for dem in dems]
assert len(dems) == len(timestamps), "The 'dem' and 'timestamps' len differ."
# Convert the timestamps to datetime64
self.timestamps = np.array(timestamps).astype("datetime64[ns]")
# Find the sort indices from the timestamps
indices = np.argsort(self.timestamps.astype("int64"))
self.dems = [dems[i] for i in indices]
self.ddems: list[xdem.dDEM] = []
# The reference index changes place when sorted
if isinstance(reference_dem, (int, np.integer)):
self.reference_index = np.argwhere(indices == reference_dem)[0][0]
elif isinstance(reference_dem, gu.Raster):
self.reference_index = [i for i, dem in enumerate(self.dems) if dem is reference_dem][0]
if outlines is None:
self.outlines: dict[np.datetime64, gu.Vector] = {}
elif isinstance(outlines, gu.Vector):
self.outlines = {self.timestamps[self.reference_index]: outlines}
elif all(isinstance(value, gu.Vector) for value in outlines.values()):
self.outlines = dict(zip(np.array(list(outlines.keys())).astype("datetime64[ns]"), outlines.values()))
else:
raise ValueError(
f"Invalid format on 'outlines': {type(outlines)},"
" expected one of ['gu.Vector', 'dict[datetime.datetime, gu.Vector']"
)
@property
def reference_dem(self) -> gu.Raster:
"""Get the DEM acting reference."""
return self.dems[self.reference_index]
@property
def reference_timestamp(self) -> np.datetime64:
"""Get the reference DEM timestamp."""
return self.timestamps[self.reference_index]
def subtract_dems(self, resampling_method: str = "cubic_spline") -> list[xdem.dDEM]:
"""
Generate dDEMs by subtracting all DEMs to the reference.
:param resampling_method: The resampling method to use if reprojection is needed.
:returns: A list of dDEM objects.
"""
ddems: list[xdem.dDEM] = []
# Subtract every DEM that is available.
for i, dem in enumerate(self.dems):
# If the reference DEM is encountered, make a dDEM where dH == 0 (to keep length consistency).
if dem.raster_equal(self.reference_dem):
ddem_raster = self.reference_dem.copy()
ddem_raster.data[:] = 0.0
ddem = xdem.dDEM(
ddem_raster,
start_time=self.reference_timestamp,
end_time=self.reference_timestamp,
error=0,
)
else:
ddem = xdem.dDEM(
self.reference_dem - dem.reproject(resampling=resampling_method, silent=True),
start_time=min(self.reference_timestamp, self.timestamps[i]),
end_time=max(self.reference_timestamp, self.timestamps[i]),
error=None,
)
ddems.append(ddem)
self.ddems = ddems
return self.ddems
def interpolate_ddems(self, method: str = "linear") -> list[NDArrayf]:
"""
Interpolate all the dDEMs in the DEMCollection object using the chosen interpolation method.
:param method: The chosen interpolation method.
"""
# TODO: Change is loop to run concurrently
for ddem in self.ddems:
ddem.interpolate(method=method, reference_elevation=self.reference_dem, mask=self.get_ddem_mask(ddem))
return [ddem.filled_data for ddem in self.ddems]
def get_ddem_mask(self, ddem: xdem.dDEM, outlines_filter: str | None = None) -> NDArrayf:
"""
Get a fitting dDEM mask for a provided dDEM.
The mask is created by evaluating these factors, in order:
If self.outlines do not exist, a full True boolean mask is returned.
If self.outlines have keys for the start and end time, their union is returned.
If self.outlines only have contain the start_time, its mask is returned.
If len(self.outlines) == 1, the mask of that outline is returned.
:param ddem: The dDEM to create a mask for.
:param outlines_filter: A query to filter the outline vectors. Example: "name_column == 'specific glacier'".
:returns: A mask from the above conditions.
"""
if not any(ddem is ddem_in_list for ddem_in_list in self.ddems):
raise ValueError("Given dDEM must be a part of the DEMCollection object.")
if outlines_filter is None:
outlines = self.outlines
else:
outlines = {key: gu.Vector(outline.ds.copy()) for key, outline in self.outlines.items()}
for key in outlines:
outlines[key].ds = outlines[key].ds.query(outlines_filter)
# If both the start and end time outlines exist, a mask is created from their union.
if ddem.start_time in outlines and ddem.end_time in outlines:
mask = np.logical_or(
outlines[ddem.start_time].create_mask(ddem, as_array=True),
outlines[ddem.end_time].create_mask(ddem, as_array=True),
)
# If only start time outlines exist, these should be used as a mask
elif ddem.start_time in outlines:
mask = outlines[ddem.start_time].create_mask(ddem, as_array=True)
# If only one outlines file exist, use that as a mask.
elif len(outlines) == 1:
mask = list(outlines.values())[0].create_mask(ddem, as_array=True)
# If no fitting outlines were found, make a full true boolean mask in its stead.
else:
mask = np.ones(shape=ddem.data.shape, dtype=bool)
return mask.reshape(ddem.data.shape)
def get_dh_series(
self, outlines_filter: str | None = None, mask: NDArrayf | None = None, nans_ok: bool = False
) -> pd.DataFrame:
"""
Return a dataframe of mean dDEM values and respective areas for every timestamp.
The values are always compared to the reference DEM timestamp.
:param mask: Optional. A mask for areas of interest. Overrides potential outlines of the same date.
:param nans_ok: Warn if NaNs are encountered in a dDEM (it should have been gap-filled).
:returns: A dataframe of dH values and respective areas with an Interval[Timestamp] index.
"""
if len(self.ddems) == 0:
raise ValueError("dDEMs have not yet been calculated")
dh_values = pd.DataFrame(columns=["dh", "area"], dtype=float)
for _, ddem in enumerate(self.ddems):
# Skip if the dDEM is a self-comparison
if float(ddem.time) == 0:
continue
# Use the provided mask unless it's None, otherwise make a dDEM mask.
ddem_mask = mask if mask is not None else self.get_ddem_mask(ddem, outlines_filter=outlines_filter)
# Warn if the dDEM contains nans and that's not okay
if ddem.filled_data is None and not nans_ok:
warnings.warn(f"NaNs found in dDEM ({ddem.start_time} - {ddem.end_time}).")
data = ddem.data[ddem_mask] if ddem.filled_data is None else ddem.filled_data[ddem_mask]
mean_dh = np.nanmean(data)
area = np.count_nonzero(ddem_mask) * self.reference_dem.res[0] * self.reference_dem.res[1]
dh_values.loc[pd.Interval(pd.Timestamp(ddem.start_time), pd.Timestamp(ddem.end_time))] = mean_dh, area
return dh_values
def get_dv_series(
self, outlines_filter: str | None = None, mask: NDArrayf | None = None, nans_ok: bool = False
) -> pd.Series:
"""
Return a series of mean volume change (dV) for every timestamp.
The values are always compared to the reference DEM timestamp.
:param outlines_filter: A query to filter the outline vectors. Example: "name_column == 'specific glacier'".
:param mask: Optional. A mask for areas of interest. Overrides potential outlines of the same date.
:param nans_ok: Warn if NaNs are encountered in a dDEM (it should have been gap-filled).
:returns: A series of dV values with an Interval[Timestamp] index.
"""
dh_values = self.get_dh_series(outlines_filter=outlines_filter, mask=mask, nans_ok=nans_ok)
return dh_values["area"] * dh_values["dh"]
def get_cumulative_series(
self,
kind: str = "dh",
outlines_filter: str | None = None,
mask: NDArrayf | None = None,
nans_ok: bool = False,
) -> pd.Series:
"""
Get the cumulative dH (elevation) or dV (volume) since the first timestamp.
:param kind: The kind of series. Can be dh or dv.
:param outlines_filter: A query to filter the outline vectors. Example: "name_column == 'specific glacier'".
:param mask: Optional. A mask for areas of interest.
:param nans_ok: Warn if NaNs are encountered in a dDEM (it should have been gap-filled).
:returns: A series of cumulative dH/dV with a Timestamp index.
"""
if kind.lower() == "dh":
# Get the dH series (where all indices are: "year to reference_year")
d_series = self.get_dh_series(mask=mask, outlines_filter=outlines_filter, nans_ok=nans_ok)["dh"]
elif kind.lower() == "dv":
# Get the dV series (where all indices are: "year to reference_year")
d_series = self.get_dv_series(mask=mask, outlines_filter=outlines_filter, nans_ok=nans_ok)
else:
raise ValueError("Invalid argument: '{dh=}'. Choices: ['dh', 'dv']")
# Simplify the index to just "year" (implicitly still the same as above)
cumulative_dh = pd.Series(dtype=d_series.dtype)
cumulative_dh[self.reference_timestamp] = 0.0
for i, value in zip(d_series.index, d_series.values):
non_reference_year = [date for date in [i.left, i.right] if date != self.reference_timestamp][0]
cumulative_dh.loc[non_reference_year] = -value
# Sort the dates (just to be sure. It should already be sorted)
cumulative_dh.sort_index(inplace=True)
# Subtract the entire series by the first value to
cumulative_dh -= cumulative_dh.iloc[0]
return cumulative_dh