Source code for xdem.ddem

"""Difference of DEMs classes and functions."""
from __future__ import annotations

import warnings
from typing import Any

import geoutils as gu
import numpy as np
import shapely
from geoutils.raster import Raster, RasterType, get_array_and_mask
from rasterio.crs import CRS
from rasterio.warp import Affine

import xdem
from xdem._typing import MArrayf, NDArrayf


[docs] class dDEM(Raster): # type: ignore """A difference-DEM object."""
[docs] def __init__(self, raster: gu.Raster, start_time: np.datetime64, end_time: np.datetime64, error: Any | None = None): """ Create a dDEM object from a Raster. :param raster: A georeferenced Raster object. :param start_time: The starting time of the dDEM. :param end_time: The end time of the dDEM. :param error: An error measure for the dDEM (UNUSED). :returns: A new dDEM instance. """ # super().__init__(raster) self.__dict__ = raster.__dict__ self.start_time = start_time self.end_time = end_time self.error = error self._filled_data: NDArrayf | None = None self._fill_method = ""
def __str__(self) -> str: """Return a summary of the dDEM.""" return f"dDEM from {self.start_time} to {self.end_time}.\n\n{super().__str__()}" def copy(self, new_array: NDArrayf = None) -> dDEM: """Return a copy of the DEM.""" if new_array is None: new_array = self.data.copy() new_ddem = dDEM.from_array(new_array, self.transform, self.crs, self.start_time, self.end_time) return new_ddem @property def filled_data(self) -> NDArrayf | None: """ Get the filled data array if it exists, or else the original data if it has no nans. Returns None if the filled_data array does not exist, and the original data has nans. :returns: An array or None """ if self._filled_data is not None: return self._filled_data if (isinstance(self.data, np.ma.masked_array) and np.any(self.data.mask)) or np.any(np.isnan(self.data)): return None return np.asarray(self.data) @filled_data.setter def filled_data(self, array: NDArrayf) -> None: """Set the filled_data attribute and make sure that it is valid.""" assert ( self.data.size == array.size ), f"Array shape '{array.shape}' differs from the data shape '{self.data.shape}'" self._filled_data = np.asarray(array).reshape(self.data.shape) @property def fill_method(self) -> str: """Return the fill method used for the filled_data.""" return self._fill_method @property def time(self) -> np.timedelta64: """Get the time duration.""" return self.end_time - self.start_time @classmethod def from_array( cls: type[RasterType], data: NDArrayf | MArrayf, transform: tuple[float, ...] | Affine, crs: CRS | int | None, start_time: np.datetime64, end_time: np.datetime64, nodata: int | float | None = None, error: float = None, ) -> dDEM: # type: ignore """ Create a new dDEM object from an array. :param data: The dDEM data array. :param transform: A geometric transform. :param crs: The coordinate reference system of the dDEM. :param start_time: The starting time of the dDEM. :param end_time: The end time of the dDEM. :param error: An error measure for the dDEM. :param nodata: The nodata value. :returns: A new dDEM instance. """ return cls( gu.Raster.from_array(data=data, transform=transform, crs=crs, nodata=nodata), start_time=start_time, end_time=end_time, error=error, ) def interpolate( self, method: str = "linear", reference_elevation: NDArrayf | np.ma.masked_array[Any, np.dtype[np.floating[Any]]] | xdem.DEM = None, mask: NDArrayf | xdem.DEM | gu.Vector = None, ) -> NDArrayf | None: """ Interpolate the dDEM using the given method. :param method: The method to use for interpolation. :param reference_elevation: Reference DEM. Only required for hypsometric approaches. """ if reference_elevation is not None: try: reference_elevation = reference_elevation.reproject(self, silent=True) # type: ignore except AttributeError as exception: if "object has no attribute 'reproject'" not in str(exception): raise exception if isinstance(reference_elevation, np.ndarray): reference_elevation = np.ma.masked_array(reference_elevation, mask=np.isnan(reference_elevation)) assert reference_elevation.data.shape == self.data.shape, ( f"'reference_elevation' shape ({reference_elevation.data.shape})" f" different from 'self' ({self.data.shape})" ) if method == "linear": self.filled_data = xdem.volume.linear_interpolation(self.data) elif method == "local_hypsometric": assert reference_elevation is not None assert mask is not None if not isinstance(mask, gu.Vector): mask = gu.Vector(mask) interpolated_ddem, nans = get_array_and_mask(self.data.copy()) entries = mask.ds[mask.ds.intersects(shapely.geometry.box(*self.bounds))] ddem_mask = nans.copy().squeeze() for i in entries.index: feature_mask = (gu.Vector(entries.loc[entries.index == i]).create_mask(self, as_array=True)).reshape( interpolated_ddem.shape ) if np.count_nonzero(feature_mask) == 0: continue try: with warnings.catch_warnings(): warnings.filterwarnings("ignore", "Not enough valid bins") warnings.filterwarnings("ignore", "invalid value encountered in divide") interpolated_ddem = np.asarray( xdem.volume.hypsometric_interpolation( interpolated_ddem, reference_elevation.data, mask=feature_mask ) ) except ValueError as exception: # Skip the feature if too few glacier values exist. if "x and y arrays must have at least 2 entries" in str(exception): continue raise exception # Set the validity flag of all values within the feature to be valid ddem_mask[feature_mask] = False # All values that were nan in the start and are without the updated validity mask should now be nan # The above interpolates values outside of the dDEM, so this is necessary. interpolated_ddem[ddem_mask] = np.nan diff = abs(np.nanmean(interpolated_ddem - self.data)) assert diff < 0.01, (diff, self.data.mean()) self.filled_data = xdem.volume.linear_interpolation(interpolated_ddem) elif method == "regional_hypsometric": assert reference_elevation is not None assert mask is not None mask_array = xdem.coreg.base._mask_as_array(self, mask).reshape(self.data.shape) self.filled_data = xdem.volume.hypsometric_interpolation( self.data, reference_elevation.data, mask=mask_array ).data else: raise NotImplementedError(f"Interpolation method '{method}' not supported") return self.filled_data