Source code for xdem.coreg.affine

"""Affine coregistration classes."""

from __future__ import annotations

import warnings
from typing import Any, Callable, TypeVar

import xdem.coreg.base

try:
    import cv2

    _has_cv2 = True
except ImportError:
    _has_cv2 = False
import geopandas as gpd
import numpy as np
import rasterio as rio
import scipy
import scipy.interpolate
import scipy.ndimage
import scipy.optimize
from geoutils.raster import Raster, get_array_and_mask
from tqdm import trange

from xdem._typing import NDArrayb, NDArrayf
from xdem.coreg.base import (
    Coreg,
    CoregDict,
    _get_x_and_y_coords,
    _mask_dataframe_by_dem,
    _residuals_df,
    _transform_to_bounds_and_res,
    deramping,
)
from xdem.spatialstats import nmad

try:
    import pytransform3d.transformations

    _HAS_P3D = True
except ImportError:
    _HAS_P3D = False

try:
    from noisyopt import minimizeCompass

    _has_noisyopt = True
except ImportError:
    _has_noisyopt = False

######################################
# Generic functions for affine methods
######################################


def apply_xy_shift(transform: rio.transform.Affine, dx: float, dy: float) -> rio.transform.Affine:
    """
    Apply horizontal shift to a rasterio Affine transform
    :param transform: The Affine transform of the raster
    :param dx: dx shift value
    :param dy: dy shift value

    Returns: Updated transform
    """
    transform_shifted = rio.transform.Affine(
        transform.a, transform.b, transform.c + dx, transform.d, transform.e, transform.f + dy
    )
    return transform_shifted


######################################
# Functions for affine coregistrations
######################################


def _calculate_slope_and_aspect_nuthkaab(dem: NDArrayf) -> tuple[NDArrayf, NDArrayf]:
    """
    Calculate the tangent of slope and aspect of a DEM, in radians, as needed for the Nuth & Kaab algorithm.

    :param dem: A numpy array of elevation values.

    :returns:  The tangent of slope and aspect (in radians) of the DEM.
    """
    # Old implementation
    # # Calculate the gradient of the slope
    gradient_y, gradient_x = np.gradient(dem)
    slope_tan = np.sqrt(gradient_x**2 + gradient_y**2)
    aspect = np.arctan2(-gradient_x, gradient_y)
    aspect += np.pi

    # xdem implementation
    # slope, aspect = xdem.terrain.get_terrain_attribute(
    #     dem, attribute=["slope", "aspect"], resolution=1, degrees=False
    # )
    # slope_tan = np.tan(slope)
    # aspect = (aspect + np.pi) % (2 * np.pi)

    return slope_tan, aspect


def get_horizontal_shift(
    elevation_difference: NDArrayf, slope: NDArrayf, aspect: NDArrayf, min_count: int = 20
) -> tuple[float, float, float]:
    """
    Calculate the horizontal shift between two DEMs using the method presented in Nuth and Kääb (2011).

    :param elevation_difference: The elevation difference (reference_dem - aligned_dem).
    :param slope: A slope map with the same shape as elevation_difference (units = pixels?).
    :param aspect: An aspect map with the same shape as elevation_difference (units = radians).
    :param min_count: The minimum allowed bin size to consider valid.

    :raises ValueError: If very few finite values exist to analyse.

    :returns: The pixel offsets in easting, northing, and the c_parameter (altitude?).
    """
    input_x_values = aspect

    with np.errstate(divide="ignore", invalid="ignore"):
        input_y_values = elevation_difference / slope

    # Remove non-finite values
    x_values = input_x_values[np.isfinite(input_x_values) & np.isfinite(input_y_values)]
    y_values = input_y_values[np.isfinite(input_x_values) & np.isfinite(input_y_values)]

    assert y_values.shape[0] > 0

    # Remove outliers
    lower_percentile = np.percentile(y_values, 1)
    upper_percentile = np.percentile(y_values, 99)
    valids = np.where((y_values > lower_percentile) & (y_values < upper_percentile) & (np.abs(y_values) < 200))
    x_values = x_values[valids]
    y_values = y_values[valids]

    # Slice the dataset into appropriate aspect bins
    step = np.pi / 36
    slice_bounds = np.arange(start=0, stop=2 * np.pi, step=step)
    y_medians = np.zeros([len(slice_bounds)])
    count = y_medians.copy()
    for i, bound in enumerate(slice_bounds):
        y_slice = y_values[(bound < x_values) & (x_values < (bound + step))]
        if y_slice.shape[0] > 0:
            y_medians[i] = np.median(y_slice)
        count[i] = y_slice.shape[0]

    # Filter out bins with counts below threshold
    y_medians = y_medians[count > min_count]
    slice_bounds = slice_bounds[count > min_count]

    if slice_bounds.shape[0] < 10:
        raise ValueError("Less than 10 different cells exist.")

    # Make an initial guess of the a, b, and c parameters
    initial_guess: tuple[float, float, float] = (3 * np.std(y_medians) / (2**0.5), 0.0, np.mean(y_medians))

    def estimate_ys(x_values: NDArrayf, parameters: tuple[float, float, float]) -> NDArrayf:
        """
        Estimate y-values from x-values and the current parameters.

        y(x) = a * cos(b - x) + c

        :param x_values: The x-values to feed the above function.
        :param parameters: The a, b, and c parameters to feed the above function

        :returns: Estimated y-values with the same shape as the given x-values
        """
        return parameters[0] * np.cos(parameters[1] - x_values) + parameters[2]

    def residuals(parameters: tuple[float, float, float], y_values: NDArrayf, x_values: NDArrayf) -> NDArrayf:
        """
        Get the residuals between the estimated and measured values using the given parameters.

        err(x, y) = est_y(x) - y

        :param parameters: The a, b, and c parameters to use for the estimation.
        :param y_values: The measured y-values.
        :param x_values: The measured x-values

        :returns: An array of residuals with the same shape as the input arrays.
        """
        err = estimate_ys(x_values, parameters) - y_values
        return err

    # Estimate the a, b, and c parameters with least square minimisation
    results = scipy.optimize.least_squares(
        fun=residuals, x0=initial_guess, args=(y_medians, slice_bounds), xtol=1e-8, gtol=None, ftol=None
    )

    # Round results above the tolerance to get fixed results on different OS
    a_parameter, b_parameter, c_parameter = results.x
    c_parameter = np.round(c_parameter, 3)

    # Calculate the easting and northing offsets from the above parameters
    east_offset = np.round(a_parameter * np.sin(b_parameter), 3)
    north_offset = np.round(a_parameter * np.cos(b_parameter), 3)

    return east_offset, north_offset, c_parameter


##################################
# Affine coregistration subclasses
##################################

AffineCoregType = TypeVar("AffineCoregType", bound="AffineCoreg")


[docs] class AffineCoreg(Coreg): """ Generic affine coregistration class. Builds additional common affine methods on top of the generic Coreg class. Made to be subclassed. """ _fit_called: bool = False # Flag to check if the .fit() method has been called. _is_affine: bool | None = None
[docs] def __init__( self, subsample: float | int = 1.0, matrix: NDArrayf | None = None, meta: CoregDict | None = None, ) -> None: """Instantiate a generic AffineCoreg method.""" super().__init__(meta=meta) # Define subsample size self._meta["subsample"] = subsample if matrix is not None: with warnings.catch_warnings(): # This error is fixed in the upcoming 1.8 warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") valid_matrix = pytransform3d.transformations.check_transform(matrix) self._meta["matrix"] = valid_matrix self._is_affine = True
def to_matrix(self) -> NDArrayf: """Convert the transform to a 4x4 transformation matrix.""" return self._to_matrix_func() def centroid(self) -> tuple[float, float, float] | None: """Get the centroid of the coregistration, if defined.""" meta_centroid = self._meta.get("centroid") if meta_centroid is None: return None # Unpack the centroid in case it is in an unexpected format (an array, list or something else). return meta_centroid[0], meta_centroid[1], meta_centroid[2] @classmethod def from_matrix(cls, matrix: NDArrayf) -> AffineCoreg: """ Instantiate a generic Coreg class from a transformation matrix. :param matrix: A 4x4 transformation matrix. Shape must be (4,4). :raises ValueError: If the matrix is incorrectly formatted. :returns: The instantiated generic Coreg class. """ if np.any(~np.isfinite(matrix)): raise ValueError(f"Matrix has non-finite values:\n{matrix}") with warnings.catch_warnings(): # This error is fixed in the upcoming 1.8 warnings.filterwarnings("ignore", message="`np.float` is a deprecated alias for the builtin `float`") valid_matrix = pytransform3d.transformations.check_transform(matrix) return cls(matrix=valid_matrix) @classmethod def from_translation(cls, x_off: float = 0.0, y_off: float = 0.0, z_off: float = 0.0) -> AffineCoreg: """ Instantiate a generic Coreg class from a X/Y/Z translation. :param x_off: The offset to apply in the X (west-east) direction. :param y_off: The offset to apply in the Y (south-north) direction. :param z_off: The offset to apply in the Z (vertical) direction. :raises ValueError: If the given translation contained invalid values. :returns: An instantiated generic Coreg class. """ matrix = np.diag(np.ones(4, dtype=float)) matrix[0, 3] = x_off matrix[1, 3] = y_off matrix[2, 3] = z_off return cls.from_matrix(matrix) def _to_matrix_func(self) -> NDArrayf: # FOR DEVELOPERS: This function needs to be implemented if the `self._meta['matrix']` keyword is not None. # Try to see if a matrix exists. meta_matrix = self._meta.get("matrix") if meta_matrix is not None: assert meta_matrix.shape == (4, 4), f"Invalid _meta matrix shape. Expected: (4, 4), got {meta_matrix.shape}" return meta_matrix raise NotImplementedError("This should be implemented by subclassing")
[docs] class VerticalShift(AffineCoreg): """ DEM vertical shift correction. Estimates the mean (or median, weighted avg., etc.) vertical offset between two DEMs. """
[docs] def __init__( self, vshift_func: Callable[[NDArrayf], np.floating[Any]] = np.average, subsample: float | int = 1.0 ) -> None: # pylint: # disable=super-init-not-called """ Instantiate a vertical shift correction object. :param vshift_func: The function to use for calculating the vertical shift. Default: (weighted) average. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ self._meta: CoregDict = {} # All __init__ functions should instantiate an empty dict. super().__init__(meta={"vshift_func": vshift_func}, subsample=subsample)
def _fit_rst_rst( self, ref_elev: NDArrayf, tba_elev: NDArrayf, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: """Estimate the vertical shift using the vshift_func.""" if verbose: print("Estimating the vertical shift...") diff = ref_elev - tba_elev valid_mask = np.logical_and.reduce((inlier_mask, np.isfinite(diff))) subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask) diff = diff[subsample_mask] if np.count_nonzero(np.isfinite(diff)) == 0: raise ValueError("No finite values in vertical shift comparison.") # Use weights if those were provided. vshift = ( self._meta["vshift_func"](diff) if weights is None else self._meta["vshift_func"](diff, weights) # type: ignore ) # TODO: We might need to define the type of bias_func with Callback protocols to get the optional argument, # TODO: once we have the weights implemented if verbose: print("Vertical shift estimated") self._meta["vshift"] = vshift def _apply_rst( self, elev: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the VerticalShift function to a DEM.""" return elev + self._meta["vshift"], transform def _apply_pts( self, elev: gpd.GeoDataFrame, z_name: str = "z", bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> gpd.GeoDataFrame: """Apply the VerticalShift function to a set of points.""" dem_copy = elev.copy() dem_copy[z_name] += self._meta["vshift"] return dem_copy def _to_matrix_func(self) -> NDArrayf: """Convert the vertical shift to a transform matrix.""" empty_matrix = np.diag(np.ones(4, dtype=float)) empty_matrix[2, 3] += self._meta["vshift"] return empty_matrix
[docs] class ICP(AffineCoreg): """ Iterative Closest Point DEM coregistration. Based on 3D registration of Besl and McKay (1992), https://doi.org/10.1117/12.57955. Estimates a rigid transform (rotation + translation) between two DEMs. Requires 'opencv' See opencv doc for more info: https://docs.opencv.org/master/dc/d9b/classcv_1_1ppf__match__3d_1_1ICP.html """
[docs] def __init__( self, max_iterations: int = 100, tolerance: float = 0.05, rejection_scale: float = 2.5, num_levels: int = 6, subsample: float | int = 5e5, ) -> None: """ Instantiate an ICP coregistration object. :param max_iterations: The maximum allowed iterations before stopping. :param tolerance: The residual change threshold after which to stop the iterations. :param rejection_scale: The threshold (std * rejection_scale) to consider points as outliers. :param num_levels: Number of octree levels to consider. A higher number is faster but may be more inaccurate. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ if not _has_cv2: raise ValueError("Optional dependency needed. Install 'opencv'") # TODO: Move these to _meta? self.max_iterations = max_iterations self.tolerance = tolerance self.rejection_scale = rejection_scale self.num_levels = num_levels super().__init__(subsample=subsample)
def _fit_rst_rst( self, ref_elev: NDArrayf, tba_elev: NDArrayf, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: """Estimate the rigid transform from tba_dem to ref_dem.""" if weights is not None: warnings.warn("ICP was given weights, but does not support it.") bounds, resolution = _transform_to_bounds_and_res(ref_elev.shape, transform) # Generate the x and y coordinates for the reference_dem x_coords, y_coords = _get_x_and_y_coords(ref_elev.shape, transform) gradient_x, gradient_y = np.gradient(ref_elev) normal_east = np.sin(np.arctan(gradient_y / resolution)) * -1 normal_north = np.sin(np.arctan(gradient_x / resolution)) normal_up = 1 - np.linalg.norm([normal_east, normal_north], axis=0) valid_mask = np.logical_and.reduce( (inlier_mask, np.isfinite(ref_elev), np.isfinite(normal_east), np.isfinite(normal_north)) ) subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask) ref_pts = gpd.GeoDataFrame( geometry=gpd.points_from_xy(x=x_coords[subsample_mask], y=y_coords[subsample_mask], crs=None), data={ "z": ref_elev[subsample_mask], "nx": normal_east[subsample_mask], "ny": normal_north[subsample_mask], "nz": normal_up[subsample_mask], }, ) self._fit_rst_pts( ref_elev=ref_pts, tba_elev=tba_elev, inlier_mask=inlier_mask, transform=transform, crs=crs, verbose=verbose, z_name="z", ) def _fit_rst_pts( self, ref_elev: NDArrayf | gpd.GeoDataFrame, tba_elev: NDArrayf | gpd.GeoDataFrame, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: # Check which one is reference if isinstance(ref_elev, gpd.GeoDataFrame): point_elev = ref_elev rst_elev = tba_elev ref = "point" else: point_elev = tba_elev rst_elev = ref_elev ref = "raster" # Pre-process point data point_elev = point_elev.dropna(how="any", subset=[z_name]) bounds, resolution = _transform_to_bounds_and_res(rst_elev.shape, transform) # Generate the x and y coordinates for the TBA DEM x_coords, y_coords = _get_x_and_y_coords(rst_elev.shape, transform) centroid = (np.mean([bounds.left, bounds.right]), np.mean([bounds.bottom, bounds.top]), 0.0) # Subtract by the bounding coordinates to avoid float32 rounding errors. x_coords -= centroid[0] y_coords -= centroid[1] gradient_x, gradient_y = np.gradient(rst_elev) # This CRS is temporary and doesn't affect the result. It's just needed for Raster instantiation. dem_kwargs = {"transform": transform, "crs": rio.CRS.from_epsg(32633), "nodata": -9999.0} normal_east = Raster.from_array(np.sin(np.arctan(gradient_y / resolution)) * -1, **dem_kwargs) normal_north = Raster.from_array(np.sin(np.arctan(gradient_x / resolution)), **dem_kwargs) normal_up = Raster.from_array(1 - np.linalg.norm([normal_east.data, normal_north.data], axis=0), **dem_kwargs) valid_mask = ~np.isnan(rst_elev) & ~np.isnan(normal_east.data) & ~np.isnan(normal_north.data) points: dict[str, NDArrayf] = {} points["raster"] = np.dstack( [ x_coords[valid_mask], y_coords[valid_mask], rst_elev[valid_mask], normal_east.data[valid_mask], normal_north.data[valid_mask], normal_up.data[valid_mask], ] ).squeeze() # TODO: Should be a way to not duplicate this column and just feed it directly point_elev["E"] = point_elev.geometry.x.values point_elev["N"] = point_elev.geometry.y.values if any(col not in point_elev for col in ["nx", "ny", "nz"]): for key, raster in [("nx", normal_east), ("ny", normal_north), ("nz", normal_up)]: raster.tags["AREA_OR_POINT"] = "Area" point_elev[key] = raster.interp_points( point_elev[["E", "N"]].values, shift_area_or_point=True, ) point_elev["E"] -= centroid[0] point_elev["N"] -= centroid[1] points["point"] = point_elev[["E", "N", z_name, "nx", "ny", "nz"]].values for key in points: points[key] = points[key][~np.any(np.isnan(points[key]), axis=1)].astype("float32") points[key][:, :2] -= resolution / 2 icp = cv2.ppf_match_3d_ICP(self.max_iterations, self.tolerance, self.rejection_scale, self.num_levels) if verbose: print("Running ICP...") try: # Use points as reference _, residual, matrix = icp.registerModelToScene(points["raster"], points["point"]) except cv2.error as exception: if "(expected: 'n > 0'), where" not in str(exception): raise exception raise ValueError( "Not enough valid points in input data." f"'reference_dem' had {points['ref'].size} valid points." f"'dem_to_be_aligned' had {points['tba'].size} valid points." ) # If raster was reference, invert the matrix if ref == "raster": matrix = xdem.coreg.base.invert_matrix(matrix) if verbose: print("ICP finished") assert residual < 1000, f"ICP coregistration failed: residual={residual}, threshold: 1000" self._meta["centroid"] = centroid self._meta["matrix"] = matrix
[docs] class Tilt(AffineCoreg): """ DEM tilting. Estimates an 2-D plan correction between the difference of two DEMs. """
[docs] def __init__(self, subsample: int | float = 5e5) -> None: """ Instantiate a tilt correction object. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ self.poly_order = 1 super().__init__(subsample=subsample)
def _fit_rst_rst( self, ref_elev: NDArrayf, tba_elev: NDArrayf, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: """Fit the dDEM between the DEMs to a least squares polynomial equation.""" ddem = ref_elev - tba_elev ddem[~inlier_mask] = np.nan x_coords, y_coords = _get_x_and_y_coords(ref_elev.shape, transform) fit_ramp, coefs = deramping( ddem, x_coords, y_coords, degree=self.poly_order, subsample=self._meta["subsample"], verbose=verbose ) self._meta["coefficients"] = coefs[0] self._meta["func"] = fit_ramp def _apply_rst( self, elev: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the deramp function to a DEM.""" x_coords, y_coords = _get_x_and_y_coords(elev.shape, transform) ramp = self._meta["func"](x_coords, y_coords) return elev + ramp, transform def _apply_pts( self, elev: gpd.GeoDataFrame, z_name: str = "z", bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> gpd.GeoDataFrame: """Apply the deramp function to a set of points.""" dem_copy = elev.copy() dem_copy[z_name].values += self._meta["func"](dem_copy.geometry.x.values, dem_copy.geometry.y.values) return dem_copy def _to_matrix_func(self) -> NDArrayf: """Return a transform matrix if possible.""" if self.degree > 1: raise ValueError( "Nonlinear deramping degrees cannot be represented as transformation matrices." f" (max 1, given: {self.poly_order})" ) if self.degree == 1: raise NotImplementedError("Vertical shift, rotation and horizontal scaling has to be implemented.") # If degree==0, it's just a bias correction empty_matrix = np.diag(np.ones(4, dtype=float)) empty_matrix[2, 3] += self._meta["coefficients"][0] return empty_matrix
[docs] class NuthKaab(AffineCoreg): """ Nuth and Kääb (2011) DEM coregistration: iterative registration of horizontal and vertical shift using slope/aspect. Implemented after the paper: https://doi.org/10.5194/tc-5-271-2011. """
[docs] def __init__(self, max_iterations: int = 10, offset_threshold: float = 0.05, subsample: int | float = 5e5) -> None: """ Instantiate a new Nuth and Kääb (2011) coregistration object. :param max_iterations: The maximum allowed iterations before stopping. :param offset_threshold: The residual offset threshold after which to stop the iterations (in pixels). :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. """ self._meta: CoregDict self.max_iterations = max_iterations self.offset_threshold = offset_threshold super().__init__(subsample=subsample)
def _fit_rst_rst( self, ref_elev: NDArrayf, tba_elev: NDArrayf, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: """Estimate the x/y/z offset between two DEMs.""" if verbose: print("Running Nuth and Kääb (2011) coregistration") bounds, resolution = _transform_to_bounds_and_res(ref_elev.shape, transform) # Make a new DEM which will be modified inplace aligned_dem = tba_elev.copy() # Check that DEM CRS is projected, otherwise slope is not correctly calculated if not crs.is_projected: raise NotImplementedError( f"DEMs CRS is {crs}. NuthKaab coregistration only works with \ projected CRS. First, reproject your DEMs in a local projected CRS, e.g. UTM, and re-run." ) # Calculate slope and aspect maps from the reference DEM if verbose: print(" Calculate slope and aspect") slope_tan, aspect = _calculate_slope_and_aspect_nuthkaab(ref_elev) valid_mask = np.logical_and.reduce( (inlier_mask, np.isfinite(ref_elev), np.isfinite(tba_elev), np.isfinite(slope_tan)) ) subsample_mask = self._get_subsample_on_valid_mask(valid_mask=valid_mask) ref_elev[~subsample_mask] = np.nan # Make index grids for the east and north dimensions east_grid = np.arange(ref_elev.shape[1]) north_grid = np.arange(ref_elev.shape[0]) # Make a function to estimate the aligned DEM (used to construct an offset DEM) elevation_function = scipy.interpolate.RectBivariateSpline( x=north_grid, y=east_grid, z=np.where(np.isnan(aligned_dem), -9999, aligned_dem), kx=1, ky=1 ) # Make a function to estimate nodata gaps in the aligned DEM (used to fix the estimated offset DEM) # Use spline degree 1, as higher degrees will create instabilities around 1 and mess up the nodata mask nodata_function = scipy.interpolate.RectBivariateSpline( x=north_grid, y=east_grid, z=np.isnan(aligned_dem), kx=1, ky=1 ) # Initialise east and north pixel offset variables (these will be incremented up and down) offset_east, offset_north = 0.0, 0.0 # Calculate initial dDEM statistics elevation_difference = ref_elev - aligned_dem vshift = np.nanmedian(elevation_difference) nmad_old = nmad(elevation_difference) if verbose: print(" Statistics on initial dh:") print(f" Median = {vshift:.2f} - NMAD = {nmad_old:.2f}") # Iteratively run the analysis until the maximum iterations or until the error gets low enough if verbose: print(" Iteratively estimating horizontal shift:") # If verbose is True, will use progressbar and print additional statements pbar = trange(self.max_iterations, disable=not verbose, desc=" Progress") for i in pbar: # Calculate the elevation difference and the residual (NMAD) between them. elevation_difference = ref_elev - aligned_dem vshift = np.nanmedian(elevation_difference) # Correct potential vertical shifts elevation_difference -= vshift # Estimate the horizontal shift from the implementation by Nuth and Kääb (2011) east_diff, north_diff, _ = get_horizontal_shift( # type: ignore elevation_difference=elevation_difference, slope=slope_tan, aspect=aspect ) if verbose: pbar.write(f" #{i + 1:d} - Offset in pixels : ({east_diff:.2f}, {north_diff:.2f})") # Increment the offsets with the overall offset offset_east += east_diff offset_north += north_diff # Calculate new elevations from the offset x- and y-coordinates new_elevation = elevation_function(y=east_grid + offset_east, x=north_grid - offset_north) # Set NaNs where NaNs were in the original data new_nans = nodata_function(y=east_grid + offset_east, x=north_grid - offset_north) new_elevation[new_nans > 0] = np.nan # Assign the newly calculated elevations to the aligned_dem aligned_dem = new_elevation # Update statistics elevation_difference = ref_elev - aligned_dem vshift = np.nanmedian(elevation_difference) nmad_new = nmad(elevation_difference) nmad_gain = (nmad_new - nmad_old) / nmad_old * 100 if verbose: pbar.write(f" Median = {vshift:.2f} - NMAD = {nmad_new:.2f} ==> Gain = {nmad_gain:.2f}%") # Stop if the NMAD is low and a few iterations have been made assert ~np.isnan(nmad_new), (offset_east, offset_north) offset = np.sqrt(east_diff**2 + north_diff**2) if i > 1 and offset < self.offset_threshold: if verbose: pbar.write( f" Last offset was below the residual offset threshold of {self.offset_threshold} -> stopping" ) break nmad_old = nmad_new # Print final results if verbose: print(f"\n Final offset in pixels (east, north) : ({offset_east:f}, {offset_north:f})") print(" Statistics on coregistered dh:") print(f" Median = {vshift:.2f} - NMAD = {nmad_new:.2f}") self._meta["offset_east_px"] = offset_east self._meta["offset_north_px"] = offset_north self._meta["vshift"] = vshift self._meta["resolution"] = resolution def _fit_rst_pts( self, ref_elev: NDArrayf | gpd.GeoDataFrame, tba_elev: NDArrayf | gpd.GeoDataFrame, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: """ Estimate the x/y/z offset between a DEM and points cloud. 1. deleted elevation_function and nodata_function, shifting dataframe (points) instead of DEM. 2. do not support latitude and longitude as inputs. :param z_name: the column name of dataframe used for elevation differencing """ # Check which one is reference if isinstance(ref_elev, gpd.GeoDataFrame): point_elev = ref_elev rst_elev = tba_elev ref = "point" else: point_elev = tba_elev rst_elev = ref_elev ref = "raster" if verbose: print("Running Nuth and Kääb (2011) coregistration. Shift pts instead of shifting dem") rst_elev = Raster.from_array(rst_elev, transform=transform, crs=crs, nodata=-9999) tba_arr, _ = get_array_and_mask(rst_elev) bounds, resolution = _transform_to_bounds_and_res(ref_elev.shape, transform) x_coords, y_coords = (point_elev["E"].values, point_elev["N"].values) # Assume that the coordinates represent the center of a theoretical pixel. # The raster sampling is done in the upper left corner, meaning all point have to be respectively shifted x_coords -= resolution / 2 y_coords += resolution / 2 pts = np.array((x_coords, y_coords)).T # This needs to be consistent, so it's cardcoded here area_or_point = "Area" # Make a new DEM which will be modified inplace aligned_dem = rst_elev.copy() aligned_dem.tags["AREA_OR_POINT"] = area_or_point # Calculate slope and aspect maps from the reference DEM if verbose: print(" Calculate slope and aspect") slope, aspect = _calculate_slope_and_aspect_nuthkaab(tba_arr) slope_r = rst_elev.copy(new_array=np.ma.masked_array(slope[None, :, :], mask=~np.isfinite(slope[None, :, :]))) slope_r.tags["AREA_OR_POINT"] = area_or_point aspect_r = rst_elev.copy( new_array=np.ma.masked_array(aspect[None, :, :], mask=~np.isfinite(aspect[None, :, :])) ) aspect_r.tags["AREA_OR_POINT"] = area_or_point # Initialise east and north pixel offset variables (these will be incremented up and down) offset_east, offset_north, vshift = 0.0, 0.0, 0.0 # Calculate initial DEM statistics slope_pts = slope_r.interp_points(pts, shift_area_or_point=True) aspect_pts = aspect_r.interp_points(pts, shift_area_or_point=True) tba_pts = aligned_dem.interp_points(pts, shift_area_or_point=True) # Treat new_pts as a window, every time we shift it a little bit to fit the correct view new_pts = pts.copy() elevation_difference = point_elev[z_name].values - tba_pts vshift = float(np.nanmedian(elevation_difference)) nmad_old = nmad(elevation_difference) if verbose: print(" Statistics on initial dh:") print(f" Median = {vshift:.3f} - NMAD = {nmad_old:.3f}") # Iteratively run the analysis until the maximum iterations or until the error gets low enough if verbose: print(" Iteratively estimating horizontal shit:") # If verbose is True, will use progressbar and print additional statements pbar = trange(self.max_iterations, disable=not verbose, desc=" Progress") for i in pbar: # Estimate the horizontal shift from the implementation by Nuth and Kääb (2011) east_diff, north_diff, _ = get_horizontal_shift( # type: ignore elevation_difference=elevation_difference, slope=slope_pts, aspect=aspect_pts ) if verbose: pbar.write(f" #{i + 1:d} - Offset in pixels : ({east_diff:.3f}, {north_diff:.3f})") # Increment the offsets with the overall offset offset_east += east_diff offset_north += north_diff # Assign offset to the coordinates of the pts # Treat new_pts as a window, every time we shift it a little bit to fit the correct view new_pts += [east_diff * resolution, north_diff * resolution] # Get new values tba_pts = aligned_dem.interp_points(new_pts, shift_area_or_point=True) elevation_difference = point_elev[z_name].values - tba_pts # Mask out no data by dem's mask pts_, mask_ = _mask_dataframe_by_dem(new_pts, rst_elev) # Update values relataed to shifted pts elevation_difference = elevation_difference[mask_] slope_pts = slope_r.interp_points(pts_, shift_area_or_point=True) aspect_pts = aspect_r.interp_points(pts_, shift_area_or_point=True) vshift = float(np.nanmedian(elevation_difference)) # Update statistics elevation_difference -= vshift nmad_new = nmad(elevation_difference) nmad_gain = (nmad_new - nmad_old) / nmad_old * 100 if verbose: pbar.write(f" Median = {vshift:.3f} - NMAD = {nmad_new:.3f} ==> Gain = {nmad_gain:.3f}%") # Stop if the NMAD is low and a few iterations have been made assert ~np.isnan(nmad_new), (offset_east, offset_north) offset = np.sqrt(east_diff**2 + north_diff**2) if i > 1 and offset < self.offset_threshold: if verbose: pbar.write( f" Last offset was below the residual offset threshold of {self.offset_threshold} -> stopping" ) break nmad_old = nmad_new # Print final results if verbose: print( "\n Final offset in pixels (east, north, bais) : ({:f}, {:f},{:f})".format( offset_east, offset_north, vshift ) ) print(" Statistics on coregistered dh:") print(f" Median = {vshift:.3f} - NMAD = {nmad_new:.3f}") self._meta["offset_east_px"] = offset_east if ref == "point" else -offset_east self._meta["offset_north_px"] = offset_north if ref == "point" else -offset_north self._meta["vshift"] = vshift if ref == "point" else -vshift self._meta["resolution"] = resolution self._meta["nmad"] = nmad_new def _to_matrix_func(self) -> NDArrayf: """Return a transformation matrix from the estimated offsets.""" offset_east = self._meta["offset_east_px"] * self._meta["resolution"] offset_north = self._meta["offset_north_px"] * self._meta["resolution"] matrix = np.diag(np.ones(4, dtype=float)) matrix[0, 3] += offset_east matrix[1, 3] += offset_north matrix[2, 3] += self._meta["vshift"] return matrix def _apply_rst( self, elev: NDArrayf, transform: rio.transform.Affine, crs: rio.crs.CRS, bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> tuple[NDArrayf, rio.transform.Affine]: """Apply the Nuth & Kaab shift to a DEM.""" offset_east = self._meta["offset_east_px"] * self._meta["resolution"] offset_north = self._meta["offset_north_px"] * self._meta["resolution"] updated_transform = apply_xy_shift(transform, -offset_east, -offset_north) vshift = self._meta["vshift"] return elev + vshift, updated_transform def _apply_pts( self, elev: gpd.GeoDataFrame, z_name: str = "z", bias_vars: dict[str, NDArrayf] | None = None, **kwargs: Any, ) -> gpd.GeoDataFrame: """Apply the Nuth & Kaab shift to a set of points.""" offset_east = self._meta["offset_east_px"] * self._meta["resolution"] offset_north = self._meta["offset_north_px"] * self._meta["resolution"] applied_epc = gpd.GeoDataFrame( geometry=gpd.points_from_xy( x=elev.geometry.x.values + offset_east, y=elev.geometry.y.values + offset_north, crs=elev.crs ), data={z_name: elev[z_name].values + self._meta["vshift"]}, ) return applied_epc
class GradientDescending(AffineCoreg): """ Gradient Descending coregistration by Zhihao """ def __init__( self, x0: tuple[float, float] = (0, 0), bounds: tuple[float, float] = (-3, 3), deltainit: int = 2, deltatol: float = 0.004, feps: float = 0.0001, subsample: int | float = 6000, ) -> None: """ Instantiate gradient descending coregistration object. :param x0: The initial point of gradient descending iteration. :param bounds: The boundary of the maximum shift. :param deltainit: Initial pattern size. :param deltatol: Target pattern size, or the precision you want achieve. :param feps: Parameters for algorithm. Smallest difference in function value to resolve. :param subsample: Subsample the input for speed-up. <1 is parsed as a fraction. >1 is a pixel count. The algorithm terminates when the iteration is locally optimal at the target pattern size 'deltatol', or when the function value differs by less than the tolerance 'feps' along all directions. """ self._meta: CoregDict self.bounds = bounds self.x0 = x0 self.deltainit = deltainit self.deltatol = deltatol self.feps = feps super().__init__(subsample=subsample) def _fit_rst_pts( self, ref_elev: NDArrayf | gpd.GeoDataFrame, tba_elev: NDArrayf | gpd.GeoDataFrame, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: """Estimate the x/y/z offset between two DEMs. :param point_elev: the dataframe used as ref :param rst_elev: the dem to be aligned :param z_name: the column name of dataframe used for elevation differencing :param weights: the column name of dataframe used for weight, should have the same length with z_name columns :param random_state: The random state of the subsampling. """ if not _has_noisyopt: raise ValueError("Optional dependency needed. Install 'noisyopt'") # Check which one is reference if isinstance(ref_elev, gpd.GeoDataFrame): point_elev = ref_elev rst_elev = tba_elev ref = "point" else: point_elev = tba_elev rst_elev = ref_elev ref = "raster" rst_elev = Raster.from_array(rst_elev, transform=transform, crs=crs, nodata=-9999) # Perform downsampling if subsample != None if self._meta["subsample"] and len(point_elev) > self._meta["subsample"]: point_elev = point_elev.sample( frac=self._meta["subsample"] / len(point_elev), random_state=self._meta["random_state"] ).copy() else: point_elev = point_elev.copy() bounds, resolution = _transform_to_bounds_and_res(ref_elev.shape, transform) # Assume that the coordinates represent the center of a theoretical pixel. # The raster sampling is done in the upper left corner, meaning all point have to be respectively shifted # TODO: Should be a way to not duplicate this column and just feed it directly point_elev["E"] = point_elev.geometry.x.values point_elev["N"] = point_elev.geometry.y.values point_elev["E"] -= resolution / 2 point_elev["N"] += resolution / 2 area_or_point = "Area" old_aop = rst_elev.tags.get("AREA_OR_POINT", None) rst_elev.tags["AREA_OR_POINT"] = area_or_point if verbose: print("Running Gradient Descending Coreg - Zhihao (in preparation) ") if self._meta["subsample"]: print("Running on downsampling. The length of the gdf:", len(point_elev)) elevation_difference = _residuals_df(rst_elev, point_elev, (0, 0), 0, z_name=z_name) nmad_old = nmad(elevation_difference) vshift = np.nanmedian(elevation_difference) print(" Statistics on initial dh:") print(f" Median = {vshift:.4f} - NMAD = {nmad_old:.4f}") # start iteration, find the best shifting px def func_cost(x: tuple[float, float]) -> np.floating[Any]: return nmad(_residuals_df(rst_elev, point_elev, x, 0, z_name=z_name)) res = minimizeCompass( func_cost, x0=self.x0, deltainit=self.deltainit, deltatol=self.deltatol, feps=self.feps, bounds=(self.bounds, self.bounds), disp=verbose, errorcontrol=False, ) # Send the best solution to find all results elevation_difference = _residuals_df(rst_elev, point_elev, (res.x[0], res.x[1]), 0, z_name=z_name) if old_aop is None: del rst_elev.tags["AREA_OR_POINT"] else: rst_elev.tags["AREA_OR_POINT"] = old_aop # results statistics vshift = np.nanmedian(elevation_difference) nmad_new = nmad(elevation_difference) # Print final results if verbose: print(f"\n Final offset in pixels (east, north) : ({res.x[0]:f}, {res.x[1]:f})") print(" Statistics on coregistered dh:") print(f" Median = {vshift:.4f} - NMAD = {nmad_new:.4f}") offset_east = res.x[0] offset_north = res.x[1] self._meta["offset_east_px"] = offset_east if ref == "point" else -offset_east self._meta["offset_north_px"] = offset_north if ref == "point" else -offset_north self._meta["vshift"] = vshift if ref == "point" else -vshift self._meta["resolution"] = resolution def _fit_rst_rst( self, ref_elev: NDArrayf, tba_elev: NDArrayf, inlier_mask: NDArrayb, transform: rio.transform.Affine, crs: rio.crs.CRS, z_name: str, weights: NDArrayf | None = None, bias_vars: dict[str, NDArrayf] | None = None, verbose: bool = False, **kwargs: Any, ) -> None: ref_elev = ( Raster.from_array(ref_elev, transform=transform, crs=crs, nodata=-9999.0) .to_pointcloud(force_pixel_offset="center") .ds ) ref_elev["E"] = ref_elev.geometry.x ref_elev["N"] = ref_elev.geometry.y ref_elev.rename(columns={"b1": z_name}, inplace=True) self._fit_rst_pts( ref_elev=ref_elev, tba_elev=tba_elev, transform=transform, crs=crs, inlier_mask=inlier_mask, z_name=z_name, **kwargs, ) def _to_matrix_func(self) -> NDArrayf: """Return a transformation matrix from the estimated offsets.""" offset_east = self._meta["offset_east_px"] * self._meta["resolution"] offset_north = self._meta["offset_north_px"] * self._meta["resolution"] matrix = np.diag(np.ones(4, dtype=float)) matrix[0, 3] += offset_east matrix[1, 3] += offset_north matrix[2, 3] += self._meta["vshift"] return matrix