# Copyright (c) 2024 xDEM developers
#
# This file is part of the xDEM project:
# https://github.com/glaciohack/xdem
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
#
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Spatial statistical tools to estimate uncertainties related to DEMs"""
from __future__ import annotations
import inspect
import itertools
import logging
import math as m
import multiprocessing as mp
import warnings
from typing import Any, Callable, Iterable, Literal, TypedDict, overload
import geopandas as gpd
import matplotlib
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import numba
import numpy as np
import pandas as pd
from geoutils.raster import Mask, Raster, RasterType, subsample_array
from geoutils.raster.array import get_array_and_mask
from geoutils.vector.vector import Vector, VectorType
from numpy.typing import ArrayLike
from scipy import integrate
from scipy.interpolate import RegularGridInterpolator, griddata
from scipy.optimize import curve_fit
from scipy.signal import fftconvolve
from scipy.spatial.distance import cdist, pdist, squareform
from scipy.stats import binned_statistic, binned_statistic_2d, binned_statistic_dd
from skimage.draw import disk
from xdem._typing import NDArrayf
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=DeprecationWarning)
import skgstat as skg
[docs]
def nmad(data: NDArrayf | RasterType, nfact: float = 1.4826) -> np.floating[Any]:
"""
Calculate the normalized median absolute deviation (NMAD) of an array.
Default scaling factor is 1.4826 to scale the median absolute deviation (MAD) to the dispersion of a normal
distribution (see https://en.wikipedia.org/wiki/Median_absolute_deviation#Relation_to_standard_deviation, and
e.g. Höhle and Höhle (2009), http://dx.doi.org/10.1016/j.isprsjprs.2009.02.003)
:param data: Input array or raster
:param nfact: Normalization factor for the data
:returns nmad: (normalized) median absolute deviation of data.
"""
if isinstance(data, np.ma.masked_array):
data_arr = get_array_and_mask(data, check_shape=False)[0]
elif isinstance(data, Raster):
data_arr = data
else:
data_arr = np.asarray(data)
return nfact * np.nanmedian(np.abs(data_arr - np.nanmedian(data_arr)))
[docs]
def nd_binning(
values: NDArrayf,
list_var: list[NDArrayf],
list_var_names: list[str],
list_var_bins: int | tuple[int, ...] | tuple[NDArrayf, ...] | None = None,
statistics: Iterable[str | Callable[[NDArrayf], np.floating[Any]]] = ("count", np.nanmedian, nmad),
list_ranges: list[float] | None = None,
) -> pd.DataFrame:
"""
N-dimensional binning of values according to one or several explanatory variables with computed statistics in
each bin. By default, the sample count, the median and the normalized absolute median deviation (NMAD). The count
is always computed, no matter user input.
Values input is a (N,) array and variable input is a L-sized list of flattened arrays of similar dimensions (N,).
For more details on the format of input variables, see documentation of scipy.stats.binned_statistic_dd.
:param values: Values array of size (N,)
:param list_var: List of size (L) of explanatory variables array of size (N,)
:param list_var_names: List of size (L) of names of the explanatory variables
:param list_var_bins: Count of size (1), or list of size (L) of counts or custom bin edges for the explanatory
variables; defaults to 10 bins
:param statistics: List of size (X) of statistics to be computed; defaults to count, median and nmad
:param list_ranges: List of size (L) of minimum and maximum ranges to bin the explanatory variables; defaults to
min/max of the data
:return:
"""
# We separate 1d, 2d and nd binning, because propagating statistics between different dimensional binning is not
# always feasible using scipy because it allows for several dimensional binning, while it's not straightforward in
# pandas
if list_var_bins is None:
list_var_bins = (10,) * len(list_var_names)
elif isinstance(list_var_bins, (int, np.integer)):
list_var_bins = (list_var_bins,) * len(list_var_names)
# Flatten the arrays if this has not been done by the user
values = values.ravel()
list_var = [var.ravel() for var in list_var]
# Remove no data values
valid_data = np.logical_and.reduce([np.isfinite(values)] + [np.isfinite(var) for var in list_var])
values = values[valid_data]
list_var = [var[valid_data] for var in list_var]
statistics = list(statistics)
# In case the statistics are user-defined, and they forget count, we add it for later calculation or plotting
if "count" not in statistics:
statistics.insert(0, "count")
statistics_name = [f if isinstance(f, str) else f.__name__ for f in statistics]
# Get binned statistics in 1d: a simple loop is sufficient
list_df_1d = []
for i, var in enumerate(list_var):
df_stats_1d = pd.DataFrame()
# Get statistics
for j, statistic in enumerate(statistics):
stats_binned_1d, bedges_1d = binned_statistic(
x=var, values=values, statistic=statistic, bins=list_var_bins[i], range=list_ranges
)[:2]
# Save in a dataframe
df_stats_1d[statistics_name[j]] = stats_binned_1d
# We need to get the middle of the bins from the edges, to get the same dimension length
df_stats_1d[list_var_names[i]] = pd.IntervalIndex.from_breaks(bedges_1d, closed="left")
# Report number of dimensions used
df_stats_1d.insert(0, "nd", 1)
list_df_1d.append(df_stats_1d)
# Get binned statistics in 2d: all possible 2d combinations
list_df_2d = []
if len(list_var) > 1:
combs = list(itertools.combinations(list_var_names, 2))
for _, comb in enumerate(combs):
var1_name, var2_name = comb
# Corresponding variables indexes
i1, i2 = list_var_names.index(var1_name), list_var_names.index(var2_name)
df_stats_2d = pd.DataFrame()
for j, statistic in enumerate(statistics):
stats_binned_2d, bedges_var1, bedges_var2 = binned_statistic_2d(
x=list_var[i1],
y=list_var[i2],
values=values,
statistic=statistic,
bins=[list_var_bins[i1], list_var_bins[i2]],
range=list_ranges,
)[:3]
# Get statistics
df_stats_2d[statistics_name[j]] = stats_binned_2d.flatten()
# Derive interval indexes and convert bins into 2d indexes
ii1 = pd.IntervalIndex.from_breaks(bedges_var1, closed="left")
ii2 = pd.IntervalIndex.from_breaks(bedges_var2, closed="left")
df_stats_2d[var1_name] = [i1 for i1 in ii1 for i2 in ii2]
df_stats_2d[var2_name] = [i2 for i1 in ii1 for i2 in ii2]
# Report number of dimensions used
df_stats_2d.insert(0, "nd", 2)
list_df_2d.append(df_stats_2d)
# Get binned statistics in nd, without redoing the same stats
df_stats_nd = pd.DataFrame()
if len(list_var) > 2:
for j, statistic in enumerate(statistics):
stats_binned_2d, list_bedges = binned_statistic_dd(
sample=list_var, values=values, statistic=statistic, bins=list_var_bins, range=list_ranges
)[0:2]
df_stats_nd[statistics_name[j]] = stats_binned_2d.flatten()
list_ii = []
# Loop through the bin edges and create IntervalIndexes from them (to get both
for bedges in list_bedges:
list_ii.append(pd.IntervalIndex.from_breaks(bedges, closed="left"))
# Create nd indexes in nd-array and flatten for each variable
iind = np.meshgrid(*list_ii)
for i, var_name in enumerate(list_var_names):
df_stats_nd[var_name] = iind[i].flatten()
# Report number of dimensions used
df_stats_nd.insert(0, "nd", len(list_var_names))
# Concatenate everything
list_all_dfs = list_df_1d + list_df_2d + [df_stats_nd]
df_concat = pd.concat(list_all_dfs)
# commenting for now: pd.MultiIndex can be hard to use
# df_concat = df_concat.set_index(list_var_names)
return df_concat
# Function to convert IntervalIndex written to str in csv back to pd.Interval
# from: https://github.com/pandas-dev/pandas/issues/28210
def _pandas_str_to_interval(istr: str) -> float | pd.Interval:
if isinstance(istr, float):
return np.nan
else:
c_left = istr[0] == "["
c_right = istr[-1] == "]"
closed = {(True, False): "left", (False, True): "right", (True, True): "both", (False, False): "neither"}[
c_left, c_right
]
left, right = map(float, istr[1:-1].split(","))
try:
return pd.Interval(left, right, closed)
except Exception:
return np.nan
[docs]
def interp_nd_binning(
df: pd.DataFrame,
list_var_names: str | list[str],
statistic: str | Callable[[NDArrayf], np.floating[Any]] = nmad,
interpolate_method: Literal["nearest"] | Literal["linear"] = "linear",
min_count: int | None = 100,
) -> Callable[[tuple[ArrayLike, ...]], NDArrayf]:
"""
Estimate an interpolant function for an N-dimensional binning. Preferably based on the output of nd_binning.
For more details on the input dataframe, and associated list of variable name and statistic, see nd_binning.
First, interpolates nodata values of the irregular N-D binning grid with scipy.griddata.
Then, extrapolates nodata values on the N-D binning grid with scipy.griddata with "nearest neighbour"
(necessary to get a regular grid without NaNs).
Finally, creates an interpolant function (linear by default) to interpolate between points of the grid using
scipy.RegularGridInterpolator. Extrapolation is fixed to nearest neighbour by duplicating edge bins along each
dimension (linear extrapolation of two equal bins = nearest neighbour).
If the variable pd.DataSeries corresponds to an interval (as the output of nd_binning), uses the middle of the
interval. Otherwise, uses the variable as such.
:param df: Dataframe with statistic of binned values according to explanatory variables.
:param list_var_names: Explanatory variable data series to select from the dataframe.
:param statistic: Statistic to interpolate, stored as a data series in the dataframe.
:param interpolate_method: Method to interpolate inside of edge bins, "nearest", "linear" (default).
:param min_count: Minimum number of samples to be used as a valid statistic (replaced by nodata).
:return: N-dimensional interpolant function.
:examples
# Using a dataframe created from scratch
>>> df = pd.DataFrame({"var1": [1, 2, 3, 1, 2, 3, 1, 2, 3], "var2": [1, 1, 1, 2, 2, 2, 3, 3, 3],
... "statistic": [1, 2, 3, 4, 5, 6, 7, 8, 9]})
# In 2 dimensions, the statistic array looks like this
# array([
# [1, 2, 3],
# [4, 5, 6],
# [7, 8, 9]
# ])
>>> fun = interp_nd_binning(df, list_var_names=["var1", "var2"], statistic="statistic", min_count=None)
# Right on point.
>>> fun((2, 2))
array(5.)
# Interpolated linearly inside the 2D frame.
>>> fun((1.5, 1.5))
array(3.)
# Extrapolated linearly outside the 2D frame: nearest neighbour.
>>> fun((-1, 1))
array(1.)
"""
# If list of variable input is simply a string
if isinstance(list_var_names, str):
list_var_names = [list_var_names]
# Check that the dataframe contains what we need
for var in list_var_names:
if var not in df.columns:
raise ValueError('Variable "' + var + '" does not exist in the provided dataframe.')
statistic_name = statistic if isinstance(statistic, str) else statistic.__name__
if statistic_name not in df.columns:
raise ValueError('Statistic "' + statistic_name + '" does not exist in the provided dataframe.')
if min_count is not None and "count" not in df.columns:
raise ValueError('Statistic "count" is not in the provided dataframe, necessary to use the min_count argument.')
if df.empty:
raise ValueError("Dataframe is empty.")
df_sub = df.copy()
# If the dataframe is an output of nd_binning, keep only the dimension of interest
if "nd" in df_sub.columns:
df_sub = df_sub[df_sub.nd == len(list_var_names)]
# Compute the middle values instead of bin interval if the variable is a pandas interval type
for var in list_var_names:
# Check if all value are numeric (NaN counts as integer), if yes leave as is
if all(isinstance(x, (int, float, np.integer, np.floating)) for x in df_sub[var].values):
pass
# Check if any value is a pandas interval (NaN do not count, so using any), if yes compute the middle values
elif any(isinstance(x, pd.Interval) for x in df_sub[var].values):
df_sub[var] = pd.IntervalIndex(df_sub[var]).mid.values
# Check for any unformatted interval (saving and reading a pd.DataFrame without MultiIndexing transforms
# pd.Interval into strings)
elif any(isinstance(_pandas_str_to_interval(x), pd.Interval) for x in df_sub[var].values):
intervalindex_vals = [_pandas_str_to_interval(x) for x in df_sub[var].values]
df_sub[var] = pd.IntervalIndex(intervalindex_vals).mid.values
else:
raise ValueError("The variable columns must be provided as numerical mid values, or pd.Interval values.")
# Check that explanatory variables have valid binning values which coincide along the dataframe
df_sub = df_sub[np.logical_and.reduce([np.isfinite(df_sub[var].values) for var in list_var_names])]
if df_sub.empty:
raise ValueError(
"Dataframe does not contain a nd binning with the variables corresponding to the list of variables."
)
# Check that the statistic data series contain valid data
if all(~np.isfinite(df_sub[statistic_name].values)):
raise ValueError("Dataframe does not contain any valid statistic values.")
# Remove statistic values calculated with a sample count under the minimum count
if min_count is not None:
df_sub.loc[df_sub["count"] < min_count, statistic_name] = np.nan
values = df_sub[statistic_name].values
ind_valid = np.isfinite(values)
# Re-check that the statistic data series contain valid data after filtering with min_count
if all(~ind_valid):
raise ValueError(
"Dataframe does not contain any valid statistic values after filtering with min_count = "
+ str(min_count)
+ "."
)
# Get a list of middle values for the binning coordinates, to define a nd grid
list_bmid = []
shape = []
for var in list_var_names:
bmid = sorted(np.unique(df_sub[var][ind_valid]))
list_bmid.append(bmid)
shape.append(len(bmid))
# The workflow below is a bit complicated because of the irregular grid and handling of nodata values!
# Steps 1/ and 2/ fill the nodata values in the irregular grid, and step 3/ creates the interpolant object to
# get a value at any point inside or outside the bin edges
# 1/ Use griddata first to perform interpolation for nodata values within the N-D convex hull of the irregular grid
# Valid values
values = values[ind_valid]
# Coordinates of valid values
points_valid = tuple(df_sub[var].values[ind_valid] for var in list_var_names)
# Coordinates of all grid points (convex hull points will be detected automatically and interpolated)
bmid_grid = np.meshgrid(*list_bmid, indexing="ij")
points_grid = tuple(bmid_grid[i].flatten() for i in range(len(list_var_names)))
# Interpolate on grid within convex hull with interpolation method
values_grid = griddata(points_valid, values, points_grid, method=interpolate_method)
# 2/ Use griddata to extrapolate nodata values with nearest neighbour on the N-D grid and remove all NaNs
# Valid values after above interpolation in convex hull
ind_valid_interp = np.isfinite(values_grid)
values_interp = values_grid[ind_valid_interp]
# Coordinate of valid values
points_valid_interp = tuple(points_grid[i][ind_valid_interp] for i in range(len(points_grid)))
# First extrapolate once with nearest neighbour on the original grid,
# this ensures that when the grid is extended (below) the nearest neighbour
# extrapolation will work as expected (else, there would be problems with
# extrapolation happening in a diagonal direction (along more than one grid dimension,
# as that could be the nearest bin in some cases), resulting in a final extrapolation
# that would not actually use nearest neighbour (when using interpolate_method = "linear"):
# sometimes producing unphysical negative uncertainties.
values_grid_nearest1 = griddata(points_valid_interp, values_interp, points_grid, method="nearest")
# Extend grid by a value of "1" the point coordinates in all directions to ensure
# that 3/ will extrapolate linearly as for "nearest"
list_bmid_extended = []
for i in range(len(list_bmid)):
bmid_bin = list_bmid[i]
# Add bin before first edge and decrease coord value
bmid_bin_extlow = np.insert(bmid_bin, 0, bmid_bin[0] - 1)
# Add bin after last edge and increase coord value
bmid_bin_extboth = np.append(bmid_bin_extlow, bmid_bin[-1] + 1)
list_bmid_extended.append(bmid_bin_extboth)
bmid_grid_extended = np.meshgrid(*list_bmid_extended, indexing="ij")
points_grid_extended = tuple(bmid_grid_extended[i].flatten() for i in range(len(list_var_names)))
# Update shape
shape_extended = tuple(x + 2 for x in shape)
# Extrapolate on extended grid with nearest neighbour
values_grid_nearest2 = griddata(points_grid, values_grid_nearest1, points_grid_extended, method="nearest")
values_grid_nearest2 = values_grid_nearest2.reshape(shape_extended)
# 3/ Use RegularGridInterpolator to perform interpolation **between points** of the grid, with extrapolation forced
# to nearest neighbour by duplicating edge points
# (does not support NaNs, hence the need for 2/ above)
interp_fun = RegularGridInterpolator(
tuple(list_bmid_extended), values_grid_nearest2, method="linear", bounds_error=False, fill_value=None
)
return interp_fun # type: ignore
def get_perbin_nd_binning(
df: pd.DataFrame,
list_var: list[NDArrayf],
list_var_names: str | list[str],
statistic: str | Callable[[NDArrayf], np.floating[Any]] = np.nanmedian,
min_count: int | None = 0,
) -> NDArrayf:
"""
Get per-bin array statistic for a list of array input variables, based on the results of an independent N-D binning.
For example, get a 2D array of elevation uncertainty based on 2D arrays of slope and curvature and a related binning
(for uncertainty analysis) or get a 2D array of elevation bias based on 2D arrays of rotated X coordinates (for
an along-track bias correction).
:param list_var: List of size (L) of explanatory variables array of size (N,).
:param list_var_names: List of size (L) of names of the explanatory variables.
:param df: Dataframe with statistic of binned values according to explanatory variables.
:param statistic: Statistic to use, stored as a data series in the dataframe.
:param min_count: Minimum number of samples to be used as a valid statistic (otherwise not applying operation).
:return: The array of statistic values corresponding to the input variables.
"""
# Prepare output
values_out = np.zeros(np.shape(list_var[0])) * np.nan
# If list of variable input is simply a string
if isinstance(list_var_names, str):
list_var_names = [list_var_names]
if len(list_var) != len(list_var_names):
raise ValueError("The lists of variables and variable names should be the same length.")
# Check that the dataframe contains what we need
for var in list_var_names:
if var not in df.columns:
raise ValueError('Variable "' + var + '" does not exist in the provided dataframe.')
statistic_name = statistic if isinstance(statistic, str) else statistic.__name__
if statistic_name not in df.columns:
raise ValueError('Statistic "' + statistic_name + '" does not exist in the provided dataframe.')
if min_count is not None and "count" not in df.columns:
raise ValueError('Statistic "count" is not in the provided dataframe, necessary to use the min_count argument.')
if df.empty:
raise ValueError("Dataframe is empty.")
df_sub = df.copy()
# If the dataframe is an output of nd_binning, keep only the dimension of interest
if "nd" in df_sub.columns:
df_sub = df_sub[df_sub.nd == len(list_var_names)]
# Check for any unformatted interval (saving and reading a pd.DataFrame without MultiIndexing transforms
# pd.Interval into strings)
for var_name in list_var_names:
if any(isinstance(x, pd.Interval) for x in df_sub[var_name].values):
continue
elif any(isinstance(_pandas_str_to_interval(x), pd.Interval) for x in df_sub[var_name]):
df_sub[var_name] = [_pandas_str_to_interval(x) for x in df_sub[var_name]]
else:
ValueError("The bin intervals of the dataframe should be pandas.Interval.")
# Apply operator in the nd binning
# We compute the masks linked to each 1D bin in a single for loop, to optimize speed
L = len(list_var)
all_mask_vars = []
all_interval_vars = []
for k in range(L):
# Get variable name and list of intervals in the dataframe
var_name = list_var_names[k]
list_interval_var = np.unique(df_sub[var_name].values)
# Get a list of mask for every bin of the variable
list_mask_var = [
np.logical_and(list_var[k] >= list_interval_var[j].left, list_var[k] < list_interval_var[j].right)
for j in range(len(list_interval_var))
]
# Save those in lists to later combine them
all_mask_vars.append(list_mask_var)
all_interval_vars.append(list_interval_var)
# We perform the K-D binning by logically combining the masks
all_ranges = [range(len(all_interval_vars[k])) for k in range(L)]
for indices in itertools.product(*all_ranges):
# Get mask of the specific bin, skip if empty
mask_bin = np.logical_and.reduce([all_mask_vars[k][indices[k]] for k in range(L)])
if np.count_nonzero(mask_bin) == 0:
continue
# Get the statistic
index_bin = np.logical_and.reduce(
[df_sub[list_var_names[k]] == all_interval_vars[k][indices[k]] for k in range(L)]
)
statistic_bin = df_sub[statistic_name][index_bin].values[0]
# Get count value of the statistic and use it if above the threshold
count_bin = df_sub["count"][index_bin].values[0]
if count_bin > min_count:
# Write out to the output array
values_out[mask_bin] = statistic_bin
return values_out
[docs]
def two_step_standardization(
dvalues: NDArrayf,
list_var: list[NDArrayf],
unscaled_error_fun: Callable[[tuple[ArrayLike, ...]], NDArrayf],
spread_statistic: Callable[[NDArrayf], np.floating[Any]] = nmad,
fac_spread_outliers: float | None = 7,
) -> tuple[NDArrayf, Callable[[tuple[ArrayLike, ...]], NDArrayf]]:
"""
Standardize the proxy differenced values using the modelled heteroscedasticity, re-scaled to the spread statistic,
and generate the final standardization function.
:param dvalues: Proxy values as array of size (N,) (i.e., differenced values where signal should be zero such as
elevation differences on stable terrain)
:param list_var: List of size (L) of explanatory variables array of size (N,)
:param unscaled_error_fun: Function of the spread with explanatory variables not yet re-scaled
:param spread_statistic: Statistic to be computed for the spread; defaults to nmad
:param fac_spread_outliers: Exclude outliers outside this spread after standardizing; pass None to ignore.
:return: Standardized values array of size (N,), Function to destandardize
"""
# Standardize a first time with the function
zscores = dvalues / unscaled_error_fun(tuple(list_var))
# Set large outliers that might have been created by the standardization to NaN, central tendency should already be
# around zero so only need to take the absolute value
if fac_spread_outliers is not None:
zscores[np.abs(zscores) > fac_spread_outliers * spread_statistic(zscores)] = np.nan
# Re-compute the spread statistic to re-standardize, as dividing by the function will not necessarily bring the
# z-score exactly equal to one due to approximations of N-D binning, interpolating and due to the outlier filtering
zscore_nmad = spread_statistic(zscores)
# Re-standardize
zscores /= zscore_nmad
# Define the exact function for de-standardization to pass as output
def error_fun(*args: tuple[ArrayLike, ...]) -> NDArrayf:
return zscore_nmad * unscaled_error_fun(*args)
return zscores, error_fun
def _estimate_model_heteroscedasticity(
dvalues: NDArrayf,
list_var: list[NDArrayf],
list_var_names: list[str],
spread_statistic: Callable[[NDArrayf], np.floating[Any]] = nmad,
list_var_bins: int | tuple[int, ...] | tuple[NDArrayf] | None = None,
min_count: int | None = 100,
fac_spread_outliers: float | None = 7,
) -> tuple[pd.DataFrame, Callable[[tuple[NDArrayf, ...]], NDArrayf]]:
"""
Estimate and model the heteroscedasticity (i.e., variability in error) according to a list of explanatory variables
from a proxy of differenced values (e.g., elevation differences), if possible compared to a source of higher
precision.
This function performs N-D data binning with the list of explanatory variable for a spread statistic, then
performs N-D interpolation on this statistic, scales the output with a two-step standardization to return an error
function of the explanatory variables.
The functions used are `nd_binning`, `interp_nd_binning` and `two_step_standardization`.
:param dvalues: Proxy values as array of size (N,) (i.e., differenced values where signal should be zero such as
elevation differences on stable terrain)
:param list_var: List of size (L) of explanatory variables array of size (N,)
:param list_var_names: List of size (L) of names of the explanatory variables
:param spread_statistic: Statistic to be computed for the spread; defaults to nmad
:param list_var_bins: Count of size (1), or list of size (L) of counts or custom bin edges for the explanatory
variables; defaults to 10 bins
:param min_count: Minimum number of samples to be used as a valid statistic (replaced by nodata)
:param fac_spread_outliers: Exclude outliers outside this spread after standardizing; pass None to ignore.
:return: Dataframe of binned spread statistic with explanatory variables, Error function with explanatory variables
"""
# Perform N-D binning with the differenced values computing the spread statistic
df = nd_binning(
values=dvalues,
list_var=list_var,
list_var_names=list_var_names,
statistics=[spread_statistic],
list_var_bins=list_var_bins,
)
# Perform N-D linear interpolation for the spread statistic
fun = interp_nd_binning(df, list_var_names=list_var_names, statistic=spread_statistic.__name__, min_count=min_count)
# Get the final function based on a two-step standardization
final_fun = two_step_standardization(
dvalues=dvalues,
list_var=list_var,
unscaled_error_fun=fun,
spread_statistic=spread_statistic,
fac_spread_outliers=fac_spread_outliers,
)[1]
return df, final_fun
@overload
def _preprocess_values_with_mask_to_array( # type: ignore
values: list[NDArrayf | RasterType],
include_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
exclude_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
gsd: float | None = None,
preserve_shape: bool = True,
) -> tuple[list[NDArrayf], float]: ...
@overload
def _preprocess_values_with_mask_to_array(
values: NDArrayf | RasterType,
include_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
exclude_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
gsd: float | None = None,
preserve_shape: bool = True,
) -> tuple[NDArrayf, float]: ...
def _preprocess_values_with_mask_to_array(
values: list[NDArrayf | RasterType] | NDArrayf | RasterType,
include_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
exclude_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
gsd: float | None = None,
preserve_shape: bool = True,
) -> tuple[list[NDArrayf] | NDArrayf, float]:
"""
Preprocess input values provided as Raster or ndarray with a stable and/or unstable mask provided as Vector or
ndarray into an array of stable values.
By default, the shape is preserved and the masked values converted to NaNs.
:param values: Values or list of values as a Raster, array or a list of Raster/arrays
:param include_mask: Vector shapefile of mask to include (if values is Raster), or boolean array of same shape as
values
:param exclude_mask: Vector shapefile of mask to exclude (if values is Raster), or boolean array of same shape
as values
:param gsd: Ground sampling distance, if all the input values are provided as array
:param preserve_shape: If True, masks unstable values with NaN. If False, returns a 1D array of stable values.
:return: Array of stable terrain values, Ground sampling distance
"""
# Check inputs: needs to be Raster, array or a list of those
if not isinstance(values, (Raster, np.ndarray, list)) or (
isinstance(values, list) and not all(isinstance(val, (Raster, np.ndarray)) for val in values)
):
raise ValueError("The values must be a Raster or NumPy array, or a list of those.")
# Masks need to be an array, Vector or GeoPandas dataframe
if include_mask is not None and not isinstance(include_mask, (np.ndarray, Vector, Mask, gpd.GeoDataFrame)):
raise ValueError("The stable mask must be a Vector, Mask, GeoDataFrame or NumPy array.")
if exclude_mask is not None and not isinstance(exclude_mask, (np.ndarray, Vector, Mask, gpd.GeoDataFrame)):
raise ValueError("The unstable mask must be a Vector, Mask, GeoDataFrame or NumPy array.")
# Check that input stable mask can only be a georeferenced vector if the proxy values are a Raster to project onto
if isinstance(values, list):
any_raster = any(isinstance(val, Raster) for val in values)
else:
any_raster = isinstance(values, Raster)
if not any_raster and isinstance(include_mask, (Vector, gpd.GeoDataFrame)):
raise ValueError(
"The stable mask can only passed as a Vector or GeoDataFrame if the input values contain a Raster."
)
# If there is only one array or Raster, put alone in a list
if not isinstance(values, list):
return_unlist = True
values = [values]
else:
return_unlist = False
# Get the arrays
values_arr = [get_array_and_mask(val)[0] if isinstance(val, Raster) else val for val in values]
# Get the ground sampling distance from the first Raster if there is one
if gsd is None and any_raster:
for i in range(len(values)):
if isinstance(values[i], Raster):
first_raster = values[i]
break
# Looks like mypy cannot trace the isinstance here... ignoring
gsd = first_raster.res[0] # type: ignore
elif gsd is not None:
gsd = gsd
else:
raise ValueError("The ground sampling distance must be provided if no Raster object is passed.")
# If the stable mask is not an array, create it
if include_mask is None:
include_mask_arr = np.ones(np.shape(values_arr[0]), dtype=bool)
elif isinstance(include_mask, (Vector, gpd.GeoDataFrame)):
# If the stable mask is a geopandas dataframe, wrap it in a Vector object
if isinstance(include_mask, gpd.GeoDataFrame):
stable_vector = Vector(include_mask)
else:
stable_vector = include_mask
# Create the mask
include_mask_arr = stable_vector.create_mask(first_raster, as_array=True)
# If the mask is a Mask
elif isinstance(include_mask, Mask):
include_mask_arr = include_mask.data.filled(False)
# If the mask is already an array, just pass it
else:
include_mask_arr = include_mask
# If the unstable mask is not an array, create it
if exclude_mask is None:
exclude_mask_arr = np.zeros(np.shape(values_arr[0]), dtype=bool)
elif isinstance(exclude_mask, (Vector, gpd.GeoDataFrame)):
# If the unstable mask is a geopandas dataframe, wrap it in a Vector object
if isinstance(exclude_mask, gpd.GeoDataFrame):
unstable_vector = Vector(exclude_mask)
else:
unstable_vector = exclude_mask
# Create the mask
exclude_mask_arr = unstable_vector.create_mask(first_raster, as_array=True)
# If the mask is already an array, just pass it
# If the mask is a Mask
elif isinstance(exclude_mask, Mask):
exclude_mask_arr = exclude_mask.data.filled(False)
else:
exclude_mask_arr = exclude_mask
include_mask_arr = np.logical_and(include_mask_arr, ~exclude_mask_arr).squeeze()
if preserve_shape:
# Need to preserve the shape, so setting as NaNs all points not on stable terrain
values_stable_arr = []
for val in values_arr:
val_stable = val.copy()
val_stable[~include_mask_arr] = np.nan
values_stable_arr.append(val_stable)
else:
values_stable_arr = [val_arr[include_mask_arr] for val_arr in values_arr]
# If input was a list, give a list. If it was a single array, give a single array.
if return_unlist:
values_stable_arr = values_stable_arr[0] # type: ignore
return values_stable_arr, gsd
@overload
def infer_heteroscedasticity_from_stable(
dvalues: NDArrayf,
list_var: list[NDArrayf | RasterType],
stable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
list_var_names: list[str] = None,
spread_statistic: Callable[[NDArrayf], np.floating[Any]] = nmad,
list_var_bins: int | tuple[int, ...] | tuple[NDArrayf] | None = None,
min_count: int | None = 100,
fac_spread_outliers: float | None = 7,
) -> tuple[NDArrayf, pd.DataFrame, Callable[[tuple[NDArrayf, ...]], NDArrayf]]: ...
@overload
def infer_heteroscedasticity_from_stable(
dvalues: RasterType,
list_var: list[NDArrayf | RasterType],
stable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
list_var_names: list[str] = None,
spread_statistic: Callable[[NDArrayf], np.floating[Any]] = nmad,
list_var_bins: int | tuple[int, ...] | tuple[NDArrayf] | None = None,
min_count: int | None = 100,
fac_spread_outliers: float | None = 7,
) -> tuple[RasterType, pd.DataFrame, Callable[[tuple[NDArrayf, ...]], NDArrayf]]: ...
[docs]
def infer_heteroscedasticity_from_stable(
dvalues: NDArrayf | RasterType,
list_var: list[NDArrayf | RasterType],
stable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
list_var_names: list[str] = None,
spread_statistic: Callable[[NDArrayf], np.floating[Any]] = nmad,
list_var_bins: int | tuple[int, ...] | tuple[NDArrayf] | None = None,
min_count: int | None = 100,
fac_spread_outliers: float | None = 7,
) -> tuple[NDArrayf | RasterType, pd.DataFrame, Callable[[tuple[NDArrayf, ...]], NDArrayf]]:
"""
Infer heteroscedasticity from differenced values on stable terrain and a list of explanatory variables.
This function returns an error map, a dataframe of spread values and the error function with explanatory variables.
It is a convenience wrapper for `estimate_model_heteroscedasticity` to work on either Raster or array, compute the
stable mask and return an error map.
If no stable or unstable mask is provided to mask in or out the values, all terrain is used.
:param dvalues: Proxy values as array or Raster (i.e., differenced values where signal should be zero such as
elevation differences on stable terrain)
:param list_var: List of size (L) of explanatory variables as array or Raster of same shape as dvalues
:param stable_mask: Vector shapefile of stable terrain (if dvalues is Raster), or boolean array of same shape as
dvalues
:param unstable_mask: Vector shapefile of unstable terrain (if dvalues is Raster), or boolean array of same shape
as dvalues
:param list_var_names: List of size (L) of names of the explanatory variables, otherwise named var1, var2, etc.
:param spread_statistic: Statistic to be computed for the spread; defaults to nmad
:param list_var_bins: Count of size (1), or list of size (L) of counts or custom bin edges for the explanatory
variables; defaults to 10 bins
:param min_count: Minimum number of samples to be used as a valid statistic (replaced by nodata)
:param fac_spread_outliers: Exclude outliers outside this spread after standardizing; pass None to ignore.
:return: Inferred error map (array or Raster, same as input proxy values),
Dataframe of binned spread statistic with explanatory variables,
Error function with explanatory variables
"""
# Create placeholder variables names if those don't exist
if list_var_names is None:
list_var_names = ["var" + str(i + 1) for i in range(len(list_var))]
# Get the arrays for proxy values and explanatory variables
list_all_arr, gsd = _preprocess_values_with_mask_to_array(
values=[dvalues] + list_var, include_mask=stable_mask, exclude_mask=unstable_mask, preserve_shape=False
)
dvalues_stable_arr = list_all_arr[0]
list_var_stable_arr = list_all_arr[1:]
# Estimate and model the heteroscedasticity using only stable terrain
df, fun = _estimate_model_heteroscedasticity(
dvalues=dvalues_stable_arr,
list_var=list_var_stable_arr,
list_var_names=list_var_names,
spread_statistic=spread_statistic,
list_var_bins=list_var_bins,
min_count=min_count,
fac_spread_outliers=fac_spread_outliers,
)
# Use the standardization function to get the error array for the entire input array (not only stable)
list_var_arr = [get_array_and_mask(var)[0] if isinstance(var, Raster) else var for var in list_var]
error = fun(tuple(list_var_arr))
# Return the right type, depending on dvalues input
if isinstance(dvalues, Raster):
return dvalues.copy(new_array=error), df, fun
else:
return error, df, fun
def _create_circular_mask(
shape: tuple[int, int], center: tuple[int, int] | None = None, radius: float | None = None
) -> NDArrayf:
"""
Create circular mask on a raster, defaults to the center of the array and its half width
:param shape: shape of array
:param center: center
:param radius: radius
:return:
"""
w, h = shape
if center is None: # use the middle of the image
center = (int(w / 2), int(h / 2))
if radius is None: # use the smallest distance between the center and image walls
radius = min(center[0], center[1], w - center[0], h - center[1])
# Skimage disk is not inclusive (correspond to distance_from_center < radius and not <= radius)
mask = np.zeros(shape, dtype=bool)
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "invalid value encountered in *divide")
rr, cc = disk(center=center, radius=radius, shape=shape)
mask[rr, cc] = True
# manual solution
# Y, X = np.ogrid[:h, :w]
# dist_from_center = np.sqrt((X - center[0]) ** 2 + (Y - center[1]) ** 2)
# mask = dist_from_center < radius
return mask
def _create_ring_mask(
shape: tuple[int, int],
center: tuple[int, int] | None = None,
in_radius: float = 0,
out_radius: float | None = None,
) -> NDArrayf:
"""
Create ring mask on a raster, defaults to the center of the array and a circle mask of half width of the array
:param shape: shape of array
:param center: center
:param in_radius: inside radius
:param out_radius: outside radius
:return:
"""
w, h = shape
if center is None:
center = (int(w / 2), int(h / 2))
if out_radius is None:
out_radius = min(center[0], center[1], w - center[0], h - center[1])
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "invalid value encountered in *divide")
mask_inside = _create_circular_mask((w, h), center=center, radius=in_radius)
mask_outside = _create_circular_mask((w, h), center=center, radius=out_radius)
mask_ring = np.logical_and(~mask_inside, mask_outside)
return mask_ring
def _subsample_wrapper(
values: NDArrayf,
coords: NDArrayf,
shape: tuple[int, int],
subsample: int = 10000,
subsample_method: str = "pdist_ring",
inside_radius: float = 0,
outside_radius: float = None,
random_state: int | np.random.Generator | None = None,
) -> tuple[NDArrayf, NDArrayf]:
"""
(Not used by default)
Wrapper for subsampling pdist methods
"""
nx, ny = shape
rng = np.random.default_rng(random_state)
# Subsample spatially for disk/ring methods
if subsample_method in ["pdist_disk", "pdist_ring"]:
# Select random center coordinates
center_x = rng.choice(nx, 1)[0]
center_y = rng.choice(ny, 1)[0]
if subsample_method == "pdist_ring":
subindex = _create_ring_mask(
(nx, ny), center=(center_x, center_y), in_radius=inside_radius, out_radius=outside_radius
)
else:
subindex = _create_circular_mask((nx, ny), center=(center_x, center_y), radius=outside_radius)
index = subindex.flatten()
values_sp = values[index]
coords_sp = coords[index, :]
else:
values_sp = values
coords_sp = coords
index = subsample_array(values_sp, subsample=subsample, return_indices=True, random_state=random_state)
values_sub = values_sp[index[0]]
coords_sub = coords_sp[index[0], :]
return values_sub, coords_sub
def _aggregate_pdist_empirical_variogram(
values: NDArrayf,
coords: NDArrayf,
subsample: int,
shape: tuple[int, int],
subsample_method: str,
gsd: float,
pdist_multi_ranges: list[float] | None = None,
# **kwargs: **EmpiricalVariogramKArgs, # This will work in Python 3.12, fails in the meantime, some ignore will be
# removable then in this function
**kwargs: Any,
) -> pd.DataFrame:
"""
(Not used by default)
Aggregating subfunction of sample_empirical_variogram for pdist methods.
The pairwise differences are calculated within each subsample.
"""
# If no multi_ranges are provided, define a logical default behaviour with the pixel size and grid size
if subsample_method in ["pdist_disk", "pdist_ring"]:
if pdist_multi_ranges is None:
# Define list of ranges as exponent 2 of the resolution until the maximum range
pdist_multi_ranges = []
# We start at 10 times the ground sampling distance
new_range = gsd * 10
while new_range < kwargs.get("maxlag") / 2: # type: ignore
pdist_multi_ranges.append(new_range)
new_range *= 2
pdist_multi_ranges.append(kwargs.get("maxlag")) # type: ignore
# Define subsampling parameters
list_inside_radius = []
list_outside_radius: list[float | None] = []
binned_ranges = [0.0] + pdist_multi_ranges
for i in range(len(binned_ranges) - 1):
# Radiuses need to be passed as pixel sizes, dividing by ground sampling distance
outside_radius = binned_ranges[i + 1] / gsd
if subsample_method == "pdist_ring":
inside_radius = binned_ranges[i] / gsd
else:
inside_radius = 0.0
list_outside_radius.append(outside_radius)
list_inside_radius.append(inside_radius)
else:
# For random point selection, no need for multi-range parameters
pdist_multi_ranges = [kwargs.get("maxlag")] # type: ignore
list_outside_radius = [None]
list_inside_radius = [0.0]
# Estimate variogram with specific subsampling at multiple ranges
list_df_range = []
for j in range(len(pdist_multi_ranges)):
values_sub, coords_sub = _subsample_wrapper(
values,
coords,
shape=shape,
subsample=subsample,
subsample_method=subsample_method,
inside_radius=list_inside_radius[j],
outside_radius=list_outside_radius[j],
random_state=kwargs.get("random_state"),
)
if len(values_sub) == 0:
continue
df_range = _get_pdist_empirical_variogram(values=values_sub, coords=coords_sub, **kwargs)
# Aggregate runs
list_df_range.append(df_range)
df = pd.concat(list_df_range)
return df
def _get_pdist_empirical_variogram(values: NDArrayf, coords: NDArrayf, **kwargs: Any) -> pd.DataFrame:
"""
Get empirical variogram from skgstat.Variogram object calculating pairwise distances within the sample
:param values: Values
:param coords: Coordinates
:return: Empirical variogram (variance, upper bound of lag bin, counts)
"""
# Remove random_state keyword argument that is not used
kwargs.pop("random_state")
# Get arguments of Variogram class init function
variogram_args = skg.Variogram.__init__.__code__.co_varnames[: skg.Variogram.__init__.__code__.co_argcount]
# Check no other argument is left to be passed
remaining_kwargs = kwargs.copy()
for arg in variogram_args:
remaining_kwargs.pop(arg, None)
if len(remaining_kwargs) != 0:
warnings.warn("Keyword arguments: " + ",".join(list(remaining_kwargs.keys())) + " were not used.")
# Filter corresponding arguments before passing
filtered_kwargs = {k: kwargs[k] for k in variogram_args if k in kwargs}
# Derive variogram with default MetricSpace (equivalent to scipy.pdist)
V = skg.Variogram(coordinates=coords, values=values, normalize=False, fit_method=None, **filtered_kwargs)
# Get bins, empirical variogram values, and bin count
bins, exp = V.get_empirical()
count = V.bin_count
# Write to dataframe
df = pd.DataFrame()
df = df.assign(exp=exp, bins=bins, count=count)
return df
def _choose_cdist_equidistant_sampling_parameters(**kwargs: Any) -> tuple[int, int, float]:
"""
Add a little calculation to partition the "subsample" argument automatically into the "run" and "samples"
arguments of RasterEquidistantMetricSpace, to have a similar number of points than with a classic pdist method.
We compute the arguments to match a N0**2/2 number of pairwise comparison, N0 being the "subsample" input, and
forcing the number of rings to 10 by default. This corresponds to 10 independent rings with equal number of samples
compared pairwise against a central disk. We force this number of sample to be at least 2 (skgstat raises an error
if there is only one). Additionally, if samples permit, we compute 10 independent runs, maximum 100 to limit
processing times when aggregating different runs in sparse matrixes. If even more sample permit (default case), we
increase the number of subsamples in rings and runs simultaneously.
The number of pairwise samples for a classic pdist is N0(N0-1)/2 with N0 the number of samples of the ensemble. For
the cdist equidistant calculation it is M*N*R where N are the subsamples in the center disk, M is the number of
samples in the rings which amounts to X*N where X is the number of rings in the grid extent, as each ring draws N
samples. And R is the number of runs with a different random center point.
X is fixed by the extent and ratio_subsample parameters, and so N0**2/2 = R*X*N**2, and we want at least 10 rings
and, if possible, 10 runs.
!! Different variables: !! The "samples" of RasterEquidistantMetricSpace is N, while the "subsample" passed is N0.
"""
# First, we extract the extent, shape and subsample values from the keyword arguments
extent = kwargs["extent"]
shape = kwargs["shape"]
subsample = kwargs["subsample"]
# We define the number of rings to 10 in order to get a decent equidistant sampling, we'll later adjust the
# ratio_sampling to force that number to 10
if "nb_rings" in kwargs.keys():
nb_rings = kwargs["nb_rings"]
else:
nb_rings = 10
# For one run (R=1), and two samples per disk/ring (N=2), and the number of rings X=10, this requires N0 to be at
# least 10:
min_subsample = np.ceil(np.sqrt(2 * nb_rings * 2**2) + 1)
if subsample < min_subsample:
raise ValueError(f"The number of subsamples needs to be at least {min_subsample:.0f}.")
# The pairwise comparisons can be deduced from the number of rings: R * N**2 = N0**2/(2*X)
pairwise_comp_per_disk = np.ceil(subsample**2 / (2 * nb_rings))
# With R*N**2 = N0**2/2, and minimum 2 samples N, we compute the number of runs R forcing at least
# 10 runs and maximum 100
if pairwise_comp_per_disk < 10:
runs = int(pairwise_comp_per_disk / 2**2)
else:
runs = int(min(100, 10 * np.ceil((pairwise_comp_per_disk / (2**2 * 10)) ** (1 / 3))))
# Now we can derive the number of samples N, which will always be at least 2
subsample_per_disk_per_run = int(np.ceil(np.sqrt(pairwise_comp_per_disk / runs)))
# Finally, we need to force the ratio_subsample to get exactly 10 rings
# We first derive the maximum distance and resolution the same way as in skgstat.RasterEquidistantMetricSpace
maxdist = np.sqrt((extent[1] - extent[0]) ** 2 + (extent[3] - extent[2]) ** 2)
res = np.mean([(extent[1] - extent[0]) / (shape[0] - 1), (extent[3] - extent[2]) / (shape[1] - 1)])
# Then, we derive the subsample ratio. We have:
# 1) radius * sqrt(2)**X = maxdist, and
# 2) pi * radius**2 = res**2 * N / sub_ratio
# From which we can deduce: sub_ratio = res**2 * N / (pi * maxdist**2 / sqrt(2)**(2X) )
ratio_subsample = res**2 * subsample_per_disk_per_run / (np.pi * maxdist**2 / np.sqrt(2) ** (2 * nb_rings))
# And the number of total pairwise comparison
total_pairwise_comparison = runs * subsample_per_disk_per_run**2 * nb_rings
logging.info(
"Equidistant circular sampling will be performed for %d runs (random center points) with pairwise "
"comparison between %d samples (points) of the central disk and again %d samples times %d independent "
"rings centered on the same center point. This results in approximately %d pairwise comparisons (duplicate "
"pairwise points randomly selected will be removed).",
runs,
subsample_per_disk_per_run,
subsample_per_disk_per_run,
nb_rings,
total_pairwise_comparison,
)
return runs, subsample_per_disk_per_run, ratio_subsample
def _get_cdist_empirical_variogram(
values: NDArrayf, coords: NDArrayf, subsample_method: str, **kwargs: Any
) -> pd.DataFrame:
"""
Get empirical variogram from skgstat.Variogram object calculating pairwise distances between two sample collections
of a MetricSpace (see scikit-gstat documentation for more details)
:param values: Values
:param coords: Coordinates
:return: Empirical variogram (variance, upper bound of lag bin, counts)
"""
if subsample_method == "cdist_equidistant":
if "runs" not in kwargs.keys() and "samples" not in kwargs.keys():
# We define subparameters for the equidistant technique to match the number of pairwise comparison
# that would have a classic "subsample" with pdist, except if those parameters are already user-defined
runs, samples, ratio_subsample = _choose_cdist_equidistant_sampling_parameters(**kwargs)
kwargs["ratio_subsample"] = ratio_subsample
kwargs["runs"] = runs
# The "samples" argument is used by skgstat Metric subclasses (and not "subsample")
kwargs["samples"] = samples
kwargs.pop("subsample")
elif subsample_method == "cdist_point":
# We set the samples to match the subsample argument if the method is random points
kwargs["samples"] = kwargs.pop("subsample")
# Rename the "random_state" argument into "rng", also used by skgstat Metric subclasses
kwargs["rnd"] = kwargs.pop("random_state")
# Define MetricSpace function to be used, fetch possible keywords arguments
if subsample_method == "cdist_point":
# List keyword arguments of the Probabilistic class init function
ms_args = skg.ProbabalisticMetricSpace.__init__.__code__.co_varnames[
: skg.ProbabalisticMetricSpace.__init__.__code__.co_argcount
]
ms = skg.ProbabalisticMetricSpace
else:
# List keyword arguments of the RasterEquidistant class init function
ms_args = skg.RasterEquidistantMetricSpace.__init__.__code__.co_varnames[
: skg.RasterEquidistantMetricSpace.__init__.__code__.co_argcount
]
ms = skg.RasterEquidistantMetricSpace
# Get arguments of Variogram class init function
variogram_args = skg.Variogram.__init__.__code__.co_varnames[: skg.Variogram.__init__.__code__.co_argcount]
# Check no other argument is left to be passed, accounting for MetricSpace arguments
remaining_kwargs = kwargs.copy()
for arg in variogram_args + ms_args:
remaining_kwargs.pop(arg, None)
if len(remaining_kwargs) != 0:
warnings.warn("Keyword arguments: " + ", ".join(list(remaining_kwargs.keys())) + " were not used.")
# Filter corresponding arguments before passing to MetricSpace function
filtered_ms_kwargs = {k: kwargs[k] for k in ms_args if k in kwargs}
M = ms(coords=coords, **filtered_ms_kwargs)
# Filter corresponding arguments before passing to Variogram function
filtered_var_kwargs = {k: kwargs[k] for k in variogram_args if k in kwargs}
V = skg.Variogram(M, values=values, normalize=False, fit_method=None, **filtered_var_kwargs)
# Get bins, empirical variogram values, and bin count
bins, exp = V.get_empirical(bin_center=False)
count = V.bin_count
# Write to dataframe
df = pd.DataFrame()
df = df.assign(exp=exp, bins=bins, count=count)
return df
def _wrapper_get_empirical_variogram(argdict: dict[str, Any]) -> pd.DataFrame:
"""
Multiprocessing wrapper for get_pdist_empirical_variogram and get_cdist_empirical variogram
:param argdict: Keyword argument to pass to get_pdist/cdist_empirical_variogram
:return: Empirical variogram (variance, upper bound of lag bin, counts)
"""
logging.debug("Working on run " + str(argdict["i"]) + " out of " + str(argdict["imax"]))
argdict.pop("i")
argdict.pop("imax")
if argdict["subsample_method"] in ["cdist_equidistant", "cdist_point"]:
# Simple wrapper for the skgstat Variogram function for cdist methods
return _get_cdist_empirical_variogram(**argdict)
else:
# Aggregating several skgstat Variogram after iterative subsampling of specific points in the Raster
return _aggregate_pdist_empirical_variogram(**argdict)
class EmpiricalVariogramKArgs(TypedDict, total=False):
runs: int
pdist_multi_ranges: list[float]
ratio_subsample: float
samples: int
nb_rings: int
maxlag: float
bin_func: Any
estimator: str
[docs]
def sample_empirical_variogram(
values: NDArrayf | RasterType,
gsd: float = None,
coords: NDArrayf = None,
subsample: int = 1000,
subsample_method: str = "cdist_equidistant",
n_variograms: int = 1,
n_jobs: int = 1,
random_state: int | np.random.Generator | None = None,
# **kwargs: **EmpiricalVariogramKArgs, # This will work in Python 3.12, fails in the meantime, we'll be able to
# remove some type ignores from this function in the future
**kwargs: int | list[float] | float | str | Any,
) -> pd.DataFrame:
"""
Sample empirical variograms with binning adaptable to multiple ranges and spatial subsampling adapted for raster
data.
Returns an empirical variogram (empirical variance, upper bound of spatial lag bin, count of pairwise samples).
If values are provided as a Raster subclass, nothing else is required.
If values are provided as a 2D array (M,N), a ground sampling distance is sufficient to derive the pairwise
distances.
If values are provided as a 1D array (N), an array of coordinates (N,2) or (2,N) is expected. If the coordinates
do not correspond to points of a grid, a ground sampling distance is needed to correctly get the grid size.
By default, the subsampling is based on RasterEquidistantMetricSpace implemented in scikit-gstat. This method
samples more effectively large grid data by isolating pairs of spatially equidistant ensembles for distributed
pairwise comparison. In practice, two subsamples are drawn for pairwise comparison: one from a disk of certain
radius within the grid, and another one from rings of larger radii that increase steadily between the pixel size
and the extent of the raster. Those disks and rings are sampled several times across the grid using random centers.
See more details in Hugonnet et al. (2022), https://doi.org/10.1109/jstars.2022.3188922, in particular on
Supplementary Fig. 13. for the subsampling scheme.
The "subsample" argument determines the number of samples for each method to yield a number of pairwise comparisons
close to that of a pdist calculation, that is N*(N-1)/2 where N is the subsample argument.
For the cdist equidistant method, the "runs" (random centers) and "samples" (subsample of a disk/ring) are set
automatically to get close to N*(N-1)/2 pairwise samples, fixing a number of rings "nb_rings" to 10. Those can be
more finely adjusted by passing the argument "runs", "samples" and "nb_rings" to kwargs. Further details can be
found in the description of skgstat.MetricSpace.RasterEquidistantMetricSpace or
_choose_cdist_equidistant_sampling_parameters.
Spatial subsampling method argument subsample_method can be one of "cdist_equidistant", "cdist_point",
"pdist_point", "pdist_disk" and "pdist_ring".
The cdist methods use MetricSpace classes of scikit-gstat and do pairwise comparison between two distinct ensembles
as in scipy.spatial.cdist. For the cdist methods, the variogram is estimated in a single run from the MetricSpace.
The pdist methods use methods to subsample the Raster points directly and do pairwise comparison within a single
ensemble as in scipy.spatial.pdist. For the pdist methods, an iterative process is required: a list of ranges
subsampled independently is used.
Variograms are derived independently for several runs and ranges using each pairwise sample, and later aggregated.
If the subsampling method selected is "random_point", the multi-range argument is ignored as range has no effect on
this subsampling method.
For pdist methods, keyword arguments are passed to skgstat.Variogram.
For cdist methods, keyword arguments are passed to both skgstat.Variogram and skgstat.MetricSpace.
:param values: Values of studied variable
:param gsd: Ground sampling distance
:param coords: Coordinates
:param subsample: Number of samples to randomly draw from the values
:param subsample_method: Spatial subsampling method
:param n_variograms: Number of independent empirical variogram estimations (to estimate empirical variogram spread)
:param n_jobs: Number of processing cores
:param random_state: Random state or seed number to use for calculations (to fix random sampling during testing)
:return: Empirical variogram (variance, upper bound of lag bin, counts)
"""
# First, check all that the values provided are OK
if isinstance(values, Raster):
gsd = values.res[0]
values, mask = get_array_and_mask(values)
elif isinstance(values, (np.ndarray, np.ma.masked_array)):
values, mask = get_array_and_mask(values)
else:
raise ValueError("Values must be of type NDArrayf, np.ma.masked_array or Raster subclass.")
values = values.squeeze()
# Then, check if the logic between values, coords and gsd is respected
if (gsd is not None or subsample_method in ["cdist_equidistant", "pdist_disk", "pdist_ring"]) and values.ndim == 1:
raise ValueError(
'Values array must be 2D when using any of the "cdist_equidistant", "pdist_disk" and '
'"pdist_ring" methods, or providing a ground sampling distance instead of coordinates.'
)
elif coords is not None and values.ndim != 1:
raise ValueError("Values array must be 1D when providing coordinates.")
elif coords is not None and (coords.shape[0] != 2 and coords.shape[1] != 2):
raise ValueError("The coordinates array must have one dimension with length equal to 2")
elif values.ndim == 2 and gsd is None:
raise ValueError("The ground sampling distance must be defined when passing a 2D values array.")
# Check the subsample method provided exists, otherwise list options
if subsample_method not in ["cdist_equidistant", "cdist_point", "pdist_point", "pdist_disk", "pdist_ring"]:
raise TypeError(
'The subsampling method must be one of "cdist_equidistant, "cdist_point", "pdist_point", '
'"pdist_disk" or "pdist_ring".'
)
# Check that, for several runs, the binning function is an Iterable, otherwise skgstat might provide variogram
# values over slightly different binnings due to randomly changing subsample maximum lags
if n_variograms > 1 and "bin_func" in kwargs.keys() and not isinstance(kwargs.get("bin_func"), Iterable):
warnings.warn(
"Using a named binning function of scikit-gstat might provide different binnings for each "
"independent run. To remediate that issue, pass bin_func as an Iterable of right bin edges, "
"(or use default bin_func)."
)
# Defaulting to coordinates if those are provided
if coords is not None:
nx = None
ny = None
# Making the shape of coordinates consistent if they are transposed
if coords.shape[0] == 2 and coords.shape[1] != 2:
coords = np.transpose(coords)
# If no coordinates provided, we use the shape of the array and the provided ground sampling distance to derive
# relative coordinates (starting at zero)
else:
nx, ny = np.shape(values)
x, y = np.meshgrid(np.arange(0, values.shape[0] * gsd, gsd), np.arange(0, values.shape[1] * gsd, gsd))
coords = np.dstack((x.flatten(), y.flatten())).squeeze()
values = values.flatten()
# Get the ground sampling distance from the coordinates before keeping only valid data, if it was not provided
if gsd is None:
gsd = np.mean([coords[0, 0] - coords[0, 1], coords[0, 0] - coords[1, 0]])
# Get extent
extent = (np.min(coords[:, 0]), np.max(coords[:, 0]), np.min(coords[:, 1]), np.max(coords[:, 1]))
# Get the maximum lag from the coordinates before keeping only valid data, if it was not provided
if "maxlag" not in kwargs.keys():
# We define maximum lag as the maximum distance between coordinates (needed to provide custom bins, otherwise
# skgstat rewrites the maxlag with the subsample of coordinates provided)
maxlag = np.sqrt(
(np.max(coords[:, 0]) - np.min(coords[:, 0])) ** 2 + (np.max(coords[:, 1]) - np.min(coords[:, 1])) ** 2
)
kwargs.update({"maxlag": maxlag})
# Keep only valid data for cdist methods, remove later for pdist methods
if "cdist" in subsample_method:
ind_valid = np.isfinite(values)
values = values[ind_valid]
coords = coords[ind_valid, :]
if "bin_func" not in kwargs.keys():
# If no bin_func is provided, we provide an Iterable to provide a custom binning function to skgstat,
# because otherwise bins might be inconsistent across runs
bin_func = []
right_bin_edge = np.sqrt(2) * gsd
while right_bin_edge < kwargs.get("maxlag"):
bin_func.append(right_bin_edge)
# We use the default exponential increasing factor of RasterEquidistantMetricSpace, adapted for grids
right_bin_edge *= np.sqrt(2)
bin_func.append(kwargs.get("maxlag"))
kwargs.update({"bin_func": bin_func})
# Prepare necessary arguments to pass to variogram subfunctions
args = {
"values": values,
"coords": coords,
"subsample_method": subsample_method,
"subsample": subsample,
}
if subsample_method in ["cdist_equidistant", "pdist_ring", "pdist_disk", "pdist_point"]:
# The shape is needed for those three methods
args.update({"shape": (nx, ny)})
if subsample_method == "cdist_equidistant":
# The coordinate extent is needed for this method
args.update({"extent": extent})
else:
args.update({"gsd": gsd})
# If a random_state is passed, each run needs to be passed an independent child random state, otherwise they will
# provide exactly the same sampling and results
if random_state is not None:
# Define the random state if only a seed is provided
rng = np.random.default_rng(random_state)
# Create a list of child random states per number of variograms
list_random_state: list[None | np.random.Generator] = list(
rng.choice(n_variograms, n_variograms, replace=False)
)
else:
list_random_state = [None for i in range(n_variograms)]
# Derive the variogram
# Differentiate between 1 core and several cores for multiple runs
# All variogram runs have random sampling inherent to their subfunctions, so we provide the same input arguments
if n_jobs == 1:
logging.info("Using 1 core...")
list_df_run = []
for i in range(n_variograms):
argdict = {
"i": i,
"imax": n_variograms,
"random_state": list_random_state[i],
**args,
**kwargs, # type: ignore
}
df_run = _wrapper_get_empirical_variogram(argdict=argdict)
list_df_run.append(df_run)
else:
logging.info("Using " + str(n_jobs) + " cores...")
pool = mp.Pool(n_jobs, maxtasksperchild=1)
list_argdict = [
{"i": i, "imax": n_variograms, "random_state": list_random_state[i], **args, **kwargs} # type: ignore
for i in range(n_variograms)
]
list_df_run = pool.map(_wrapper_get_empirical_variogram, list_argdict, chunksize=1)
pool.close()
pool.join()
# Aggregate multiple ranges subsampling
df = pd.concat(list_df_run)
# For a single run, no multi-run sigma estimated
if n_variograms == 1:
df = df.rename(columns={"bins": "lags"})
df["err_exp"] = np.nan
# For several runs, group results, use mean as empirical variogram, estimate sigma, and sum the counts
else:
df_grouped = df.groupby("bins", dropna=False)
df_mean = df_grouped[["exp"]].mean()
df_std = df_grouped[["exp"]].std()
df_count = df_grouped[["count"]].sum()
df_mean["lags"] = df_mean.index.values
df_mean["err_exp"] = df_std["exp"] / np.sqrt(n_variograms)
df_mean["count"] = df_count["count"]
df = df_mean
# Fix variance error for Dowd's variogram in SciKit-GStat
# If skgstat > 1.0, we can use Dowd's without correcting, otherwise we correct
from packaging.version import Version
if Version(skg.__version__) <= Version("1.0.0"):
if "estimator" in kwargs.keys() and kwargs["estimator"] == "dowd":
# Correction: we divide all experimental variance values by 2
df.exp.values /= 2
df.err_exp.values /= 2
# Remove the last spatial lag bin which is always undersampled
df.drop(df.tail(1).index, inplace=True)
# Force output dtype (default differs on different OS)
df = df.astype({"exp": "float64", "err_exp": "float64", "lags": "float64", "count": "int64"})
return df
def _get_skgstat_variogram_model_name(model: str | Callable[[NDArrayf, float, float], NDArrayf]) -> str:
"""Function to identify a SciKit-GStat variogram model from a string or a function"""
list_supported_models = ["spherical", "gaussian", "exponential", "cubic", "stable", "matern"]
if callable(model):
if inspect.getmodule(model).__name__ == "skgstat.models": # type: ignore
model_name = model.__name__
else:
raise ValueError("Variogram models can only be passed as functions of the skgstat.models package.")
elif isinstance(model, str):
model_name = "None"
for supp_model in list_supported_models:
if model.lower() in [supp_model[0:3], supp_model]:
model_name = supp_model.lower()
if model_name == "None":
raise ValueError(
f"Variogram model name {model} not recognized. Supported models are: "
+ ", ".join(list_supported_models)
+ "."
)
else:
raise ValueError(
"Variogram models can be passed as strings or skgstat.models function. "
"Supported models are: " + ", ".join(list_supported_models) + "."
)
return model_name
def get_variogram_model_func(params_variogram_model: pd.DataFrame) -> Callable[[NDArrayf], NDArrayf]:
"""
Construct the sum of spatial variogram function from a dataframe of variogram parameters.
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:return: Function of sum of variogram with spatial lags.
"""
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Define the function of sum of variogram models of h (spatial lag) to return
def sum_model(h: NDArrayf) -> NDArrayf:
fn = np.zeros(np.shape(h))
for i in range(len(params_variogram_model)):
# Get scikit-gstat model from name or Callable
model_name = _get_skgstat_variogram_model_name(params_variogram_model["model"].values[i])
model_function = getattr(skg.models, model_name)
r = params_variogram_model["range"].values[i]
p = params_variogram_model["psill"].values[i]
# For models that expect 2 parameters
if model_name in ["spherical", "gaussian", "exponential", "cubic"]:
fn += model_function(h, r, p)
# For models that expect 3 parameters
elif model_name in ["stable", "matern"]:
s = params_variogram_model["smooth"].values[i]
fn += model_function(h, r, p, s)
return fn
return sum_model
def covariance_from_variogram(params_variogram_model: pd.DataFrame) -> Callable[[NDArrayf], NDArrayf]:
"""
Construct the spatial covariance function from a dataframe of variogram parameters.
The covariance function is the sum of partial sills "PS" minus the sum of associated variograms "gamma":
C = PS - gamma
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:return: Covariance function with spatial lags
"""
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Get total sill
total_sill = np.sum(params_variogram_model["psill"])
# Get function from sum of variogram
sum_variogram = get_variogram_model_func(params_variogram_model)
def cov(h: NDArrayf) -> NDArrayf:
return total_sill - sum_variogram(h)
return cov
[docs]
def correlation_from_variogram(params_variogram_model: pd.DataFrame) -> Callable[[NDArrayf], NDArrayf]:
"""
Construct the spatial correlation function from a dataframe of variogram parameters.
The correlation function is the covariance function "C" divided by the sum of partial sills "PS": rho = C / PS
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:return: Correlation function with spatial lags
"""
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Get total sill
total_sill = np.sum(params_variogram_model["psill"].values)
# Get covariance from sum of variogram
cov = covariance_from_variogram(params_variogram_model)
def rho(h: NDArrayf) -> NDArrayf:
return cov(h) / total_sill
return rho
[docs]
def fit_sum_model_variogram(
list_models: list[str | Callable[[NDArrayf, float, float], NDArrayf]],
empirical_variogram: pd.DataFrame,
bounds: list[tuple[float, float]] = None,
p0: list[float] = None,
maxfev: int = None,
) -> tuple[Callable[[NDArrayf], NDArrayf], pd.DataFrame]:
"""
Fit a sum of variogram models to an empirical variogram, with weighted least-squares based on sampling errors. To
use preferably with the empirical variogram dataframe returned by the `sample_empirical_variogram` function.
:param list_models: List of K variogram models to sum for the fit in order from short to long ranges. Can either be
a 3-letter string, full string of the variogram name or SciKit-GStat model function (e.g., for a
spherical model "Sph", "Spherical" or skgstat.models.spherical).
:param empirical_variogram: Empirical variogram, formatted as a dataframe with count (pairwise sample count), lags
(upper bound of spatial lag bin), exp (experimental variance), and err_exp (error on experimental variance).
:param bounds: Bounds of range and sill parameters for each model (shape K x 4 = K x range lower, range upper, sill
lower, sill upper).
:param p0: Initial guess of ranges and sills each model (shape K x 2 = K x range first guess, sill first guess).
:param maxfev: Maximum number of function evaluations before the termination, passed to scipy.optimize.curve_fit().
Convergence problems can sometimes be fixed by changing this value (default None: automatically determine the
number).
:return: Function of sum of variogram, Dataframe of optimized coefficients.
"""
# Define a function of a sum of variogram model forms, with undetermined arguments
def variogram_sum(h: float, *args: list[float]) -> float:
fn = 0.0
i = 0
for model in list_models:
# Get the model name and convert to SciKit-GStat function
model_name = _get_skgstat_variogram_model_name(model)
model_function = getattr(skg.models, model_name)
# For models that expect 2 parameters
if model_name in ["spherical", "gaussian", "exponential", "cubic"]:
fn += model_function(h, args[i], args[i + 1])
i += 2
# For models that expect 3 parameters
elif model_name in ["stable", "matern"]:
fn += model_function(h, args[i], args[i + 1], args[i + 2])
i += 3
return fn
# First, filter outliers
empirical_variogram = empirical_variogram[np.isfinite(empirical_variogram.exp.values)]
# Use shape of empirical variogram to assess rough boundaries/first estimates
n_average = np.ceil(len(empirical_variogram.exp.values) / 10)
exp_movaverage = np.convolve(empirical_variogram.exp.values, np.ones(int(n_average)) / n_average, mode="valid")
# Maximum variance of the process
max_var = np.max(exp_movaverage)
# Simplify things for scipy: let's provide boundaries and first guesses
if bounds is None:
bounds = [(0, empirical_variogram.lags.values[-1]), (0, max_var)] * len(list_models)
if p0 is None:
p0 = []
for i in range(len(list_models)):
# Use psill evenly distributed
psill_p0 = ((i + 1) / len(list_models)) * max_var
# Use corresponding ranges
# !! This fails when no empirical value crosses this (too wide binning/nugget)
# ind = np.array(np.abs(exp_movaverage-psill_p0)).argmin()
# range_p0 = empirical_variogram.lags.values[ind]
range_p0 = ((i + 1) / len(list_models)) * empirical_variogram.lags.values[-1]
p0.append(range_p0)
p0.append(psill_p0)
final_bounds = np.transpose(np.array(bounds))
# If the error provided is all NaNs (single variogram run), or all zeros (two variogram runs), run without weights
if np.all(np.isnan(empirical_variogram.err_exp.values)) or np.all(empirical_variogram.err_exp.values == 0):
cof, cov = curve_fit(
variogram_sum,
empirical_variogram.lags.values,
empirical_variogram.exp.values,
method="trf",
p0=p0,
bounds=final_bounds,
maxfev=maxfev,
)
# Otherwise, use a weighted fit
else:
# We need to filter for possible no data in the error
valid = np.isfinite(empirical_variogram.err_exp.values)
cof, cov = curve_fit(
variogram_sum,
empirical_variogram.lags.values[valid],
empirical_variogram.exp.values[valid],
method="trf",
p0=p0,
bounds=final_bounds,
sigma=empirical_variogram.err_exp.values[valid],
maxfev=maxfev,
)
# Store optimized parameters
list_df = []
i = 0
for model in list_models:
model_name = _get_skgstat_variogram_model_name(model)
# For models that expect 2 parameters
if model_name in ["spherical", "gaussian", "exponential", "cubic"]:
df = pd.DataFrame()
df = df.assign(model=[model_name], range=[cof[i]], psill=[cof[i + 1]])
i += 2
# For models that expect 3 parameters
elif model_name in ["stable", "matern"]:
df = pd.DataFrame()
df = df.assign(model=[model_name], range=[cof[i]], psill=[cof[i + 1]], smooth=[cof[i + 2]])
i += 3
list_df.append(df)
df_params = pd.concat(list_df)
# Also pass the function of sum of variogram
variogram_sum_fit = get_variogram_model_func(df_params)
return variogram_sum_fit, df_params
def _estimate_model_spatial_correlation(
dvalues: NDArrayf | RasterType,
list_models: list[str | Callable[[NDArrayf, float, float], NDArrayf]],
estimator: str = "dowd",
gsd: float = None,
coords: NDArrayf = None,
subsample: int = 1000,
subsample_method: str = "cdist_equidistant",
n_variograms: int = 1,
n_jobs: int = 1,
random_state: int | np.random.Generator | None = None,
bounds: list[tuple[float, float]] = None,
p0: list[float] = None,
**kwargs: Any,
) -> tuple[pd.DataFrame, pd.DataFrame, Callable[[NDArrayf], NDArrayf]]:
"""
Estimate and model the spatial correlation of the input variable by empirical variogram sampling and fitting of a
sum of variogram model.
The spatial correlation is returned as a function of spatial lags (in units of the input coordinates) which gives a
correlation value between 0 and 1.
This function samples an empirical variogram using skgstat.Variogram, then optimizes by weighted least-squares the
sum of a defined number of models, using the functions `sample_empirical_variogram` and `fit_sum_model_variogram`.
:param dvalues: Proxy values as array or Raster (i.e., differenced values where signal should be zero such as
elevation differences on stable terrain)
:param list_models: List of K variogram models to sum for the fit in order from short to long ranges. Can either be
a 3-letter string, full string of the variogram name or SciKit-GStat model function (e.g., for a
spherical model "Sph", "Spherical" or skgstat.models.spherical).
:param estimator: Estimator for the empirical variogram; default to Dowd's variogram (see skgstat.Variogram for
the list of available estimators).
:param gsd: Ground sampling distance
:param coords: Coordinates
:param subsample: Number of samples to randomly draw from the values
:param subsample_method: Spatial subsampling method
:param n_variograms: Number of independent empirical variogram estimations (to estimate empirical variogram spread)
:param n_jobs: Number of processing cores
:param random_state: Random state or seed number to use for calculations (to fix random sampling during testing)
:param bounds: Bounds of range and sill parameters for each model (shape K x 4 = K x range lower, range upper,
sill lower, sill upper).
:param p0: Initial guess of ranges and sills each model (shape K x 2 = K x range first guess, sill first guess).
:return: Dataframe of empirical variogram, Dataframe of optimized model parameters, Function of spatial correlation
(0 to 1) with spatial lags
"""
empirical_variogram = sample_empirical_variogram(
values=dvalues,
estimator=estimator,
gsd=gsd,
coords=coords,
subsample=subsample,
subsample_method=subsample_method,
n_variograms=n_variograms,
n_jobs=n_jobs,
random_state=random_state,
**kwargs,
)
params_variogram_model = fit_sum_model_variogram(
list_models=list_models, empirical_variogram=empirical_variogram, bounds=bounds, p0=p0
)[1]
spatial_correlation_func = correlation_from_variogram(params_variogram_model=params_variogram_model)
return empirical_variogram, params_variogram_model, spatial_correlation_func
[docs]
def infer_spatial_correlation_from_stable(
dvalues: NDArrayf | RasterType,
list_models: list[str | Callable[[NDArrayf, float, float], NDArrayf]],
stable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | Mask | VectorType | gpd.GeoDataFrame = None,
errors: NDArrayf | RasterType = None,
estimator: str = "dowd",
gsd: float = None,
coords: NDArrayf = None,
subsample: int = 1000,
subsample_method: str = "cdist_equidistant",
n_variograms: int = 1,
n_jobs: int = 1,
bounds: list[tuple[float, float]] = None,
p0: list[float] = None,
random_state: int | np.random.Generator | None = None,
**kwargs: Any,
) -> tuple[pd.DataFrame, pd.DataFrame, Callable[[NDArrayf], NDArrayf]]:
"""
Infer spatial correlation of errors from differenced values on stable terrain and a list of variogram model to fit
as a sum.
This function returns a dataframe of the empirical variogram, a dataframe of optimized model parameters, and a
spatial correlation function. The spatial correlation is returned as a function of spatial lags
(in units of the input coordinates) which gives a correlation value between 0 and 1.
It is a convenience wrapper for `estimate_model_spatial_correlation` to work on either Raster or array and compute
the stable mask.
If no stable or unstable mask is provided to mask in or out the values, all terrain is used.
:param dvalues: Proxy values as array or Raster (i.e., differenced values where signal should be zero such as
elevation differences on stable terrain)
:param list_models: List of K variogram models to sum for the fit in order from short to long ranges. Can either be
a 3-letter string, full string of the variogram name or SciKit-GStat model function (e.g., for a
spherical model "Sph", "Spherical" or skgstat.models.spherical).
:param stable_mask: Vector shapefile of stable terrain (if dvalues is Raster), or boolean array of same shape as
dvalues
:param unstable_mask: Vector shapefile of unstable terrain (if dvalues is Raster), or boolean array of same shape
as dvalues
:param errors: Error values to account for heteroscedasticity (ignored if None).
:param estimator: Estimator for the empirical variogram; default to Dowd's variogram (see skgstat.Variogram for
the list of available estimators).
:param gsd: Ground sampling distance, if input values are provided as array
:param coords: Coordinates
:param subsample: Number of samples to randomly draw from the values
:param subsample_method: Spatial subsampling method
:param n_variograms: Number of independent empirical variogram estimations (to estimate empirical variogram spread)
:param n_jobs: Number of processing cores
:param bounds: Bounds of range and sill parameters for each model (shape K x 4 = K x range lower, range upper,
sill lower, sill upper).
:param p0: Initial guess of ranges and sills each model (shape K x 2 = K x range first guess, sill first guess).
:param random_state: Random state or seed number to use for calculations (to fix random sampling during testing)
:return: Dataframe of empirical variogram, Dataframe of optimized model parameters, Function of spatial correlation
(0 to 1) with spatial lags
"""
dvalues_stable_arr, gsd = _preprocess_values_with_mask_to_array(
values=dvalues, include_mask=stable_mask, exclude_mask=unstable_mask, gsd=gsd
)
# Perform standardization if error array is provided
if errors is not None:
if isinstance(errors, Raster):
errors_arr = get_array_and_mask(errors)[0]
else:
errors_arr = errors
# Standardize
dvalues_stable_arr /= errors_arr
# Estimate and model spatial correlations
empirical_variogram, params_variogram_model, spatial_correlation_func = _estimate_model_spatial_correlation(
dvalues=dvalues_stable_arr,
list_models=list_models,
estimator=estimator,
gsd=gsd,
coords=coords,
subsample=subsample,
subsample_method=subsample_method,
n_variograms=n_variograms,
n_jobs=n_jobs,
random_state=random_state,
bounds=bounds,
p0=p0,
**kwargs,
)
return empirical_variogram, params_variogram_model, spatial_correlation_func
def _check_validity_params_variogram(params_variogram_model: pd.DataFrame) -> None:
"""Check the validity of the modelled variogram parameters dataframe (mostly in the case it is passed manually)."""
# Check that expected columns exists
expected_columns = ["model", "range", "psill"]
if not all(c in params_variogram_model for c in expected_columns):
raise ValueError(
'The dataframe with variogram parameters must contain the columns "model", "range" and "psill".'
)
# Check that the format of variogram models are correct
for model in params_variogram_model["model"].values:
_get_skgstat_variogram_model_name(model)
# Check that the format of ranges, sills are correct
for r in params_variogram_model["range"].values:
if not isinstance(r, (float, np.floating, int, np.integer)):
raise ValueError("The variogram ranges must be float or integer.")
if r <= 0:
raise ValueError("The variogram ranges must have non-zero, positive values.")
# Check that the format of ranges, sills are correct
for p in params_variogram_model["psill"].values:
if not isinstance(p, (float, np.floating, int, np.integer)):
raise ValueError("The variogram partial sills must be float or integer.")
if p <= 0:
raise ValueError("The variogram partial sills must have non-zero, positive values.")
# Check that the matern smoothness factor exist and is rightly formatted
if ["stable"] in params_variogram_model["model"].values or ["matern"] in params_variogram_model["model"].values:
if "smooth" not in params_variogram_model:
raise ValueError(
'The dataframe with variogram parameters must contain the column "smooth" for '
"the smoothness factor when using Matern or Stable models."
)
for i in range(len(params_variogram_model)):
if params_variogram_model["model"].values[i] in ["stable", "matern"]:
s = params_variogram_model["smooth"].values[i]
if not isinstance(s, (float, np.floating, int, np.integer)):
raise ValueError("The variogram smoothness parameter must be float or integer.")
if s <= 0:
raise ValueError("The variogram smoothness parameter must have non-zero, positive values.")
def neff_circular_approx_theoretical(area: float, params_variogram_model: pd.DataFrame) -> float:
"""
Number of effective samples approximated from exact disk integration of a sum of any number of variogram models
of spherical, gaussian, exponential or cubic form over a disk of a certain area. This approximation performs best
for areas with a shape close to that of a disk.
Inspired by Rolstad et al. (2009): http://dx.doi.org/10.3189/002214309789470950.
The input variogram parameters match the format of the dataframe returned by `fit_sum_variogram_models`, also
detailed in the parameter description to be passed manually.
This function contains the exact integrated formulas and is mostly used for testing the numerical integration
of any number and forms of variograms provided by the function `neff_circular_approx_numerical`.
The number of effective samples serves to convert between standard deviation and standard error. For example, with
two models: if SE is the standard error, SD the standard deviation and N_eff the number of effective samples:
SE = SD / sqrt(N_eff) => N_eff = SD^2 / SE^2 => N_eff = (PS1 + PS2)/SE^2 where PS1 and PS2 are the partial sills
estimated from the variogram models, and SE is estimated by integrating the variogram models with parameters PS1/PS2
and R1/R2 where R1/R2 are the correlation ranges.
:param area: Area (in square unit of the variogram ranges)
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:return: Number of effective samples
"""
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Lag l_equiv equal to the radius needed for a disk of equivalent area A
l_equiv = np.sqrt(area / np.pi)
# Below, we list exact integral functions over an area A assumed a disk integrated radially from the center
# Formulas of h * covariance = h * ( psill - variogram ) for each form, then its integral for each form to yield
# the standard error SE. a1 = range and c1 = partial sill.
# Spherical: h * covariance = c1 * h * ( 1 - 3/2 * h / a1 + 1/2 * (h/a1)**3 )
# = c1 * (h - 3/2 * h**2 / a1 + 1/2 * h**4 / a1**3)
# Spherical: radial integral of above from 0 to L:
# SE**2 = 2 / (L**2) * c1 * (L**2 / 2 - 3/2 * L**3 / 3 / a1 + 1/2 * 1/5 * L**5 / a1**3)
# which leads to SE**2 = c1 * (1 - L / a1 + 1/5 * (L/a1)**3 )
# If spherical model is above the spherical range a1: SE**2 = c1 /5 * (a1/L)**2
def spherical_exact_integral(a1: float, c1: float, L: float) -> float:
if l_equiv <= a1:
squared_se = c1 * (1 - L / a1 + 1 / 5 * (L / a1) ** 3)
else:
squared_se = c1 / 5 * (a1 / L) ** 2
return squared_se
# Exponential: h * covariance = c1 * h * exp(-h/a); a = a1/3
# Exponential: radial integral of above from 0 to L: SE**2 = 2 / (L**2) * c1 * a * (a - exp(-L/a) * (a + L))
def exponential_exact_integral(a1: float, c1: float, L: float) -> float:
a = a1 / 3
squared_se = 2 * c1 * (a / L) ** 2 * (1 - np.exp(-L / a) * (1 + L / a))
return squared_se
# Gaussian: h * covariance = c1 * h * exp(-h**2/a**2) ; a = a1/2
# Gaussian: radial integral of above from 0 to L: SE**2 = 2 / (L**2) * c1 * 1/2 * a**2 * (1 - exp(-L**2/a**2))
def gaussian_exact_integral(a1: float, c1: float, L: float) -> float:
a = a1 / 2
squared_se = c1 * (a / L) ** 2 * (1 - np.exp(-(L**2) / a**2))
return squared_se
# Cubic: h * covariance = c1 * h * (1 - (7 * (h**2 / a**2)) + ((35 / 4) * (h**3 / a**3)) -
# ((7 / 2) * (h**5 / a**5)) + ((3 / 4) * (h**7 / a**7)))
# Cubic: radial integral of above from 0 to L:
# SE**2 = c1 * (6*a**7 -21*a**5*L**2 + 21*a**4*L**3 - 6*a**2*L**5 + L**7) / (6*a**7)
def cubic_exact_integral(a1: float, c1: float, L: float) -> float:
if l_equiv <= a1:
squared_se = (
c1 * (6 * a1**7 - 21 * a1**5 * L**2 + 21 * a1**4 * L**3 - 6 * a1**2 * L**5 + L**7) / (6 * a1**7)
)
else:
squared_se = 1 / 6 * c1 * a1**2 / L**2
return squared_se
squared_se = 0.0
valid_models = ["spherical", "exponential", "gaussian", "cubic"]
exact_integrals = [
spherical_exact_integral,
exponential_exact_integral,
gaussian_exact_integral,
cubic_exact_integral,
]
for i in np.arange(len(params_variogram_model)):
model_name = _get_skgstat_variogram_model_name(params_variogram_model["model"].values[i])
r = params_variogram_model["range"].values[i]
p = params_variogram_model["psill"].values[i]
if model_name in valid_models:
exact_integral = exact_integrals[valid_models.index(model_name)]
squared_se += exact_integral(r, p, l_equiv)
# We sum all partial sill to get the total sill
total_sill = np.nansum(params_variogram_model.psill)
# The number of effective sample is the fraction of total sill by squared standard error
neff = total_sill / squared_se
return neff
def _integrate_fun(fun: Callable[[NDArrayf], NDArrayf], low_b: float, upp_b: float) -> float:
"""
Numerically integrate function between an upper and lower bounds
:param fun: Function to integrate
:param low_b: Lower bound
:param upp_b: Upper bound
:return: Integral between lower and upper bound
"""
return integrate.quad(fun, low_b, upp_b)[0]
def neff_circular_approx_numerical(area: float | int, params_variogram_model: pd.DataFrame) -> float:
"""
Number of effective samples derived from numerical integration for any sum of variogram models over a circular area.
This is a generalization of Rolstad et al. (2009): http://dx.doi.org/10.3189/002214309789470950, which is verified
against exact integration of `neff_circular_approx_theoretical`. This approximation performs best for areas with
a shape close to that of a disk.
The input variogram parameters match the format of the dataframe returned by `fit_sum_variogram_models`, also
detailed in the parameter description to be passed manually.
The number of effective samples N_eff serves to convert between standard deviation and standard error
over the area: SE = SD / sqrt(N_eff) if SE is the standard error, SD the standard deviation.
:param area: Area (in square unit of the variogram ranges)
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:returns: Number of effective samples
"""
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Get the total sill from the sum of partial sills
total_sill = np.nansum(params_variogram_model.psill)
# Define the covariance sum function times the spatial lag, for later integration
def hcov_sum(h: NDArrayf) -> NDArrayf:
return h * covariance_from_variogram(params_variogram_model)(h)
# Get a radius for which the circle as the defined area
h_equiv = np.sqrt(area / np.pi)
# Integrate the covariance function between the center and the radius
full_int = _integrate_fun(hcov_sum, 0, h_equiv)
# Get the standard error, and return the number of effective samples
squared_se = 2 * np.pi * full_int / area
# The number of effective sample is the fraction of total sill by squared standard error
neff = total_sill / squared_se
return neff
def neff_exact(
coords: NDArrayf, errors: NDArrayf, params_variogram_model: pd.DataFrame, vectorized: bool = True
) -> float:
"""
Exact number of effective samples derived from a double sum of covariance with euclidean coordinates based on
the provided variogram parameters. This method works for any shape of area.
:param coords: Center coordinates with size (N,2) for each spatial support (typically, pixel)
:param errors: Errors at the coordinates with size (N,) for each spatial support (typically, pixel)
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:param vectorized: Perform the vectorized calculation (used for testing).
:return: Number of effective samples
"""
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Get spatial correlation function from variogram parameters
rho = correlation_from_variogram(params_variogram_model)
# Get number of points and pairwise distance compacted matrix from scipy.pdist
n = len(coords)
pds = pdist(coords)
# Now we compute the double covariance sum
# Either using for-loop-version
if not vectorized:
var = 0.0
for i in range(n):
for j in range(n):
# For index calculation of the pairwise distance,
# see https://docs.scipy.org/doc/scipy/reference/generated/scipy.spatial.distance.pdist.html
if i == j:
d = 0
elif i < j:
ind = n * i + j - ((i + 2) * (i + 1)) // 2
d = pds[ind]
else:
ind = n * j + i - ((j + 2) * (j + 1)) // 2
d = pds[ind]
var += rho(d) * errors[i] * errors[j] # type: ignore
# Or vectorized version
else:
# Convert the compact pairwise distance into a square matrix
pds_matrix = squareform(pds)
# Vectorize calculation
var = np.sum(
errors.reshape((-1, 1)) @ errors.reshape((1, -1)) * rho(pds_matrix.flatten()).reshape(pds_matrix.shape)
)
# The number of effective sample is the fraction of total sill by squared standard error
squared_se_dsc = var / n**2
neff = float(np.mean(errors)) ** 2 / squared_se_dsc
return neff
def neff_hugonnet_approx(
coords: NDArrayf,
errors: NDArrayf,
params_variogram_model: pd.DataFrame,
subsample: int = 1000,
vectorized: bool = True,
random_state: int | np.random.Generator | None = None,
) -> float:
"""
Approximated number of effective samples derived from a double sum of covariance subsetted on one of the two sums,
based on euclidean coordinates with the provided variogram parameters. This method works for any shape of area.
See Hugonnet et al. (2022), https://doi.org/10.1109/jstars.2022.3188922, in particular Supplementary Fig. S16.
:param coords: Center coordinates with size (N,2) for each spatial support (typically, pixel)
:param errors: Errors at the coordinates with size (N,) for each spatial support (typically, pixel)
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:param subsample: Number of samples to subset the calculation
:param vectorized: Perform the vectorized calculation (used for testing).
:param random_state: Random state or seed number to use for calculations (to fix random sampling during testing)
:return: Number of effective samples
"""
# Define random state
rng = np.random.default_rng(random_state)
# Check input dataframe
_check_validity_params_variogram(params_variogram_model)
# Get spatial correlation function from variogram parameters
rho = correlation_from_variogram(params_variogram_model)
# Get number of points and pairwise distance matrix from scipy.cdist
n = len(coords)
# At maximum, the number of subsamples has to be equal to number of points
subsample = min(subsample, n)
# Get random subset of points for one of the sums
rand_points = rng.choice(n, size=subsample, replace=False)
# Subsample coordinates in 1D before computing pairwise distances
sub_coords = coords[rand_points, :]
sub_errors = errors[rand_points]
pds_matrix = cdist(coords, sub_coords, "euclidean")
# Now we compute the double covariance sum
# Either using for-loop-version
if not vectorized:
var = 0.0
for i in range(pds_matrix.shape[0]):
for j in range(pds_matrix.shape[1]):
d = pds_matrix[i, j]
var += rho(d) * errors[i] * errors[j] # type: ignore
# Or vectorized version
else:
# Vectorized calculation
var = np.sum(
errors.reshape((-1, 1)) @ sub_errors.reshape((1, -1)) * rho(pds_matrix.flatten()).reshape(pds_matrix.shape)
)
# The number of effective sample is the fraction of total sill by squared standard error
squared_se_dsc = var / (n * subsample)
neff = float(np.mean(errors)) ** 2 / squared_se_dsc
return neff
[docs]
def number_effective_samples(
area: float | int | VectorType | gpd.GeoDataFrame,
params_variogram_model: pd.DataFrame,
rasterize_resolution: RasterType | float = None,
**kwargs: Any,
) -> float:
"""
Compute the number of effective samples, i.e. the number of uncorrelated samples, in an area accounting for spatial
correlations described by a sum of variogram models.
This function wraps two methods:
- A discretized integration method that provides the exact estimate for any shape of area using a double sum of
covariance. By default, this method is approximated using Equation 18 of Hugonnet et al. (2022),
https://doi.org/10.1109/jstars.2022.3188922 to decrease computing times while preserving a good approximation.
- A continuous integration method that provides a conservative (i.e., slightly overestimated) value for a disk
area shape, based on a generalization of the approach of Rolstad et al. (2009),
http://dx.doi.org/10.3189/002214309789470950.
By default, if a numeric value is passed for an area, the continuous method is used considering a disk shape. If a
vector is passed, the discretized method is computed on that shape. If the discretized method is used, a resolution
for rasterization is generally expected, otherwise is arbitrarily chosen as a fifth of the shortest correlation
range to ensure a sufficiently fine grid for propagation of the shortest range.
:param area: Area of interest either as a numeric value of surface in the same unit as the variogram ranges (will
assume a circular shape), or as a vector (shapefile) of the area
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:param rasterize_resolution: Resolution to rasterize the area if passed as a vector. Can be a float value or a
Raster.
:param kwargs: Keyword argument to pass to the `neff_hugonnet_approx` function.
:return: Number of effective samples
"""
# Check input for variogram parameters
_check_validity_params_variogram(params_variogram_model=params_variogram_model)
# If area is numeric, run the continuous circular approximation
if isinstance(area, (float, int)):
neff = neff_circular_approx_numerical(area=area, params_variogram_model=params_variogram_model)
# Otherwise, run the discrete sum of covariance
elif isinstance(area, (Vector, gpd.GeoDataFrame)):
# If the input is a geopandas dataframe, put into a Vector object
if isinstance(area, gpd.GeoDataFrame):
V = Vector(area)
else:
V = area
if rasterize_resolution is None:
rasterize_resolution = np.min(params_variogram_model["range"].values) / 5.0
warnings.warn(
"Resolution for vector rasterization is not defined and thus set at 20% of the shortest "
"correlation range, which might result in large memory usage."
)
# Rasterize with numeric resolution or Raster metadata
if isinstance(rasterize_resolution, (float, int, np.floating, np.integer)):
# We only need relative mask and coordinates, not absolute
mask = V.create_mask(xres=rasterize_resolution, as_array=True)
x = rasterize_resolution * np.arange(0, mask.shape[0])
y = rasterize_resolution * np.arange(0, mask.shape[1])
coords = np.array(np.meshgrid(y, x))
coords_on_mask = coords[:, mask].T
elif isinstance(rasterize_resolution, Raster):
# With a Raster we can get the coordinates directly
mask = V.create_mask(raster=rasterize_resolution, as_array=True).squeeze()
coords = np.array(rasterize_resolution.coords())
coords_on_mask = coords[:, mask].T
else:
raise ValueError("The rasterize resolution must be a float, integer or Raster subclass.")
# Here we don't care about heteroscedasticity, so all errors are standardized to one
errors_on_mask = np.ones(len(coords_on_mask))
neff = neff_hugonnet_approx(
coords=coords_on_mask, errors=errors_on_mask, params_variogram_model=params_variogram_model, **kwargs
)
else:
raise ValueError("Area must be a float, integer, Vector subclass or geopandas dataframe.")
return neff
[docs]
def spatial_error_propagation(
areas: list[float | VectorType | gpd.GeoDataFrame],
errors: RasterType,
params_variogram_model: pd.Dataframe,
**kwargs: Any,
) -> list[float]:
"""
Spatial propagation of elevation errors to an area using the estimated heteroscedasticity and spatial correlations.
This function is based on the `number_effective_samples` function to estimate uncorrelated samples. If given a
vector area, it uses Equation 18 of Hugonnet et al. (2022), https://doi.org/10.1109/jstars.2022.3188922. If given
a numeric area, it uses a generalization of Rolstad et al. (2009), http://dx.doi.org/10.3189/002214309789470950.
The standard error SE (1-sigma) is then computed as SE = mean(SD) / Neff, where mean(SD) is the mean of errors in
the area of interest which accounts for heteroscedasticity, and Neff is the number of effective samples.
:param areas: Area of interest either as a numeric value of surface in the same unit as the variogram ranges (will
assume a circular shape), or as a vector (shapefile) of the area.
:param errors: Errors from heteroscedasticity estimation and modelling, as an array or Raster.
:param params_variogram_model: Dataframe of variogram models to sum with three to four columns, "model" for the
model types (e.g., ["spherical", "matern"]), "range" for the correlation ranges (e.g., [2, 100]), "psill" for
the partial sills (e.g., [0.8, 0.2]) and "smooth" for the smoothness parameter if it exists for this model
(e.g., [None, 0.2]).
:param kwargs: Keyword argument to pass to the `neff_hugonnet_approx` function.
:return: List of standard errors (1-sigma) for the input areas
"""
standard_errors = []
errors_arr = get_array_and_mask(errors)[0]
for area in areas:
# We estimate the number of effective samples in the area
neff = number_effective_samples(
area=area, params_variogram_model=params_variogram_model, rasterize_resolution=errors, **kwargs
)
# We compute the average error in this area
# If the area is only a value, take the average error over the entire Raster
if isinstance(area, float):
average_spread = np.nanmean(errors_arr)
else:
if isinstance(area, gpd.GeoDataFrame):
area_vector = Vector(area)
else:
area_vector = area
area_mask = area_vector.create_mask(errors, as_array=True).squeeze()
average_spread = np.nanmean(errors_arr[area_mask])
# Compute the standard error from those two values
standard_error = average_spread / np.sqrt(neff)
standard_errors.append(standard_error)
return standard_errors
def _std_err_finite(std: float, neff_tot: float, neff: float) -> float:
"""
Standard error formula for a subsample of a finite ensemble.
:param std: standard deviation
:param neff_tot: maximum number of effective samples
:param neff: number of effective samples
:return: standard error
"""
return std * np.sqrt(1 / neff_tot * (neff_tot - neff) / neff_tot)
def _std_err(std: float, neff: float) -> float:
"""
Standard error formula.
:param std: standard deviation
:param neff: number of effective samples
:return: standard error
"""
return std * np.sqrt(1 / neff)
def _distance_latlon(tup1: tuple[float, float], tup2: tuple[float, float], earth_rad: float = 6373000) -> float:
"""
Distance between two lat/lon coordinates projected on a spheroid
ref: https://stackoverflow.com/questions/19412462/getting-distance-between-two-points-based-on-latitude-longitude
:param tup1: lon/lat coordinates of first point
:param tup2: lon/lat coordinates of second point
:param earth_rad: radius of the earth in meters
:return: distance
"""
lat1 = m.radians(abs(tup1[1]))
lon1 = m.radians(abs(tup1[0]))
lat2 = m.radians(abs(tup2[1]))
lon2 = m.radians(abs(tup2[0]))
dlon = lon2 - lon1
dlat = lat2 - lat1
a = m.sin(dlat / 2) ** 2 + m.cos(lat1) * m.cos(lat2) * m.sin(dlon / 2) ** 2
c = 2 * m.atan2(m.sqrt(a), m.sqrt(1 - a))
distance = earth_rad * c
return distance
def _scipy_convolution(imgs: NDArrayf, filters: NDArrayf, output: NDArrayf) -> None:
"""
Scipy convolution on a number n_N of 2D images of size N1 x N2 using a number of kernels n_M of sizes M1 x M2.
:param imgs: Input array of size (n_N, N1, N2) with n_N images of size N1 x N2
:param filters: Input array of filters of size (n_M, M1, M2) with n_M filters of size M1 x M2
:param output: Initialized output array of size (n_N, n_M, N1, N2)
"""
for i_N in np.arange(imgs.shape[0]):
for i_M in np.arange(filters.shape[0]):
output[i_N, i_M, :, :] = fftconvolve(imgs[i_N, :, :], filters[i_M, :, :], mode="same")
nd4type = numba.double[:, :, :, :]
nd3type = numba.double[:, :, :]
@numba.njit((nd3type, nd3type, nd4type)) # type: ignore
def _numba_convolution(imgs: NDArrayf, filters: NDArrayf, output: NDArrayf) -> None:
"""
Numba convolution on a number n_N of 2D images of size N1 x N2 using a number of kernels n_M of sizes M1 x M2.
:param imgs: Input array of size (n_N, N1, N2) with n_N images of size N1 x N2
:param filters: Input array of filters of size (n_M, M1, M2) with n_M filters of size M1 x M2
:param output: Initialized output array of size (n_N, n_M, N1, N2)
"""
n_rows, n_cols, n_imgs = imgs.shape
height, width, n_filters = filters.shape
for ii in range(n_imgs):
for rr in range(n_rows - height + 1):
for cc in range(n_cols - width + 1):
for hh in range(height):
for ww in range(width):
for ff in range(n_filters):
imgval = imgs[rr + hh, cc + ww, ii]
filterval = filters[hh, ww, ff]
output[rr, cc, ii, ff] += imgval * filterval
def convolution(imgs: NDArrayf, filters: NDArrayf, method: str = "scipy") -> NDArrayf:
"""
Convolution on a number n_N of 2D images of size N1 x N2 using a number of kernels n_M of sizes M1 x M2, using
either scipy.signal.fftconvolve or accelerated numba loops.
Note that the indexes on n_M and n_N correspond to first axes on the array to speed up computations (prefetching).
Inspired by: https://laurentperrinet.github.io/sciblog/posts/2017-09-20-the-fastest-2d-convolution-in-the-world.html
:param imgs: Input array of size (n_N, N1, N2) with n_N images of size N1 x N2
:param filters: Input array of filters of size (n_M, M1, M2) with n_M filters of size M1 x M2
:param method: Method to perform the convolution: "scipy" or "numba"
:return: Filled array of outputs of size (n_N, n_M, N1, N2)
"""
# Initialize output array according to input shapes
n_N, N1, N2 = imgs.shape
n_M, M1, M2 = filters.shape
output = np.zeros((n_N, n_M, N1, N2))
if method.lower() == "scipy":
_scipy_convolution(imgs=imgs, filters=filters, output=output)
elif method.lower() == "numba":
_numba_convolution(
imgs=imgs.astype(dtype=np.double),
filters=filters.astype(dtype=np.double),
output=output.astype(dtype=np.double),
)
else:
raise ValueError('Method must be "scipy" or "numba".')
return output
def mean_filter_nan(
img: NDArrayf, kernel_size: int, kernel_shape: str = "circular", method: str = "scipy"
) -> tuple[NDArrayf, NDArrayf, int]:
"""
Apply a mean filter to an image with a square or circular kernel of size p and with NaN values ignored.
:param img: Input array of size (N1, N2)
:param kernel_size: Size M of kernel, which will be a symmetrical (M, M) kernel
:param kernel_shape: Shape of kernel, either "square" or "circular"
:param method: Method to perform the convolution: "scipy" or "numba"
:return: Array of size (N1, N2) with mean values, Array of size (N1, N2) with number of valid pixels, Number of
pixels in the kernel
"""
# Simplify kernel size notation
p = kernel_size
# Copy the array and replace NaNs by zeros before summing them in the convolution
img_zeroed = img.copy()
img_zeroed[~np.isfinite(img_zeroed)] = 0
# Define square kernel
if kernel_shape.lower() == "square":
kernel = np.ones((p, p), dtype="uint8")
# Circle kernel
elif kernel_shape.lower() == "circular":
kernel = _create_circular_mask((p, p)).astype("uint8")
else:
raise ValueError('Kernel shape should be "square" or "circular".')
# Run convolution to compute the sum of img values
summed_img = convolution(
imgs=img_zeroed.reshape((1, img_zeroed.shape[0], img_zeroed.shape[1])),
filters=kernel.reshape((1, kernel.shape[0], kernel.shape[1])),
method=method,
).squeeze()
# Construct a boolean array for nodatas
nodata_img = np.ones(np.shape(img), dtype=np.int8)
nodata_img[~np.isfinite(img)] = 0
# Count the number of valid pixels in the kernel with a convolution
nb_valid_img = convolution(
imgs=nodata_img.reshape((1, nodata_img.shape[0], nodata_img.shape[1])), # type: ignore
filters=kernel.reshape((1, kernel.shape[0], kernel.shape[1])),
method=method,
).squeeze()
# Compute the final mean filter which accounts for no data
with warnings.catch_warnings():
warnings.filterwarnings("ignore", "divide by zero encountered in *divide")
mean_img = summed_img / nb_valid_img
# Compute the number of pixel per kernel
nb_pixel_per_kernel = np.count_nonzero(kernel)
return mean_img, nb_valid_img, nb_pixel_per_kernel
def _patches_convolution(
values: NDArrayf,
gsd: float,
area: float,
perc_min_valid: float = 80.0,
patch_shape: str = "circular",
method: str = "scipy",
statistic_between_patches: Callable[[NDArrayf], np.floating[Any]] = nmad,
return_in_patch_statistics: bool = False,
) -> tuple[float, float, float] | tuple[float, float, float, pd.DataFrame]:
"""
:param values: Values as array of shape (N1, N2) with NaN for masked values
:param gsd: Ground sampling distance
:param area: Size of integration area (squared unit of ground sampling distance)
:param perc_min_valid: Minimum valid area in the patch
:param patch_shape: Shape of patch, either "circular" or "square"
:param method: Method to perform the convolution, "scipy" or "numba"
:param statistic_between_patches: Statistic to compute between all patches, typically a measure of spread, applied
to the first in-patch statistic, which is typically the mean
:param return_in_patch_statistics: Whether to return the dataframe of statistics for all patches and areas
:return: Statistic between patches, Number of patches, Exact discretized area, (Optional) Dataframe of per-patch
statistics
"""
# Get kernel size to match area
# If circular, it corresponds to the diameter
if patch_shape.lower() == "circular":
kernel_size = int(np.round(2 * np.sqrt(area / np.pi) / gsd, decimals=0))
# If square, to the side length
elif patch_shape.lower() == "square":
kernel_size = int(np.round(np.sqrt(area) / gsd, decimals=0))
else:
raise ValueError('Kernel shape should be "square" or "circular".')
logging.info("Computing the convolution on the entire array...")
mean_img, nb_valid_img, nb_pixel_per_kernel = mean_filter_nan(
img=values, kernel_size=kernel_size, kernel_shape=patch_shape, method=method
)
# Exclude mean values if number of valid pixels is less than a percentage of the kernel size
mean_img[nb_valid_img < nb_pixel_per_kernel * perc_min_valid / 100.0] = np.nan
# A problem with the convolution method compared to the quadrant one is that patches are not independent, which
# can bias the estimation of spread. To remedy this, we compute spread statistics on patches separated by the
# kernel size (i.e., the diameter of the circular patch, or side of the square patch) to ensure no dependency
# There are as many combinations for this calculation as the square of the kernel_size
logging.info("Computing statistic between patches for all independent combinations...")
list_statistic_estimates = []
list_nb_independent_patches = []
for i in range(kernel_size):
for j in range(kernel_size):
statistic = statistic_between_patches(mean_img[i::kernel_size, j::kernel_size].ravel())
nb_patches = np.count_nonzero(np.isfinite(mean_img[i::kernel_size, j::kernel_size]))
list_statistic_estimates.append(statistic)
list_nb_independent_patches.append(nb_patches)
if return_in_patch_statistics:
# Create dataframe of independent patches for one independent setting
df = pd.DataFrame(
data={
"nanmean": mean_img[::kernel_size, ::kernel_size].ravel(),
"count": nb_valid_img[::kernel_size, ::kernel_size].ravel(),
}
)
# We then use the average of the statistic computed for different sets of independent patches to get a more robust
# estimate
average_statistic = float(np.nanmean(np.asarray(list_statistic_estimates)))
nb_independent_patches = float(np.nanmean(np.asarray(list_nb_independent_patches)))
exact_area = nb_pixel_per_kernel * gsd**2
if return_in_patch_statistics:
return average_statistic, nb_independent_patches, exact_area, df
else:
return average_statistic, nb_independent_patches, exact_area
def _patches_loop_quadrants(
values: NDArrayf,
gsd: float,
area: float,
patch_shape: str = "circular",
n_patches: int = 1000,
perc_min_valid: float = 80.0,
statistics_in_patch: Iterable[Callable[[NDArrayf], np.floating[Any]] | str] = (np.nanmean,),
statistic_between_patches: Callable[[NDArrayf], np.floating[Any]] = nmad,
random_state: int | np.random.Generator | None = None,
return_in_patch_statistics: bool = False,
) -> tuple[float, float, float] | tuple[float, float, float, pd.DataFrame]:
"""
Patches method for empirical estimation of the standard error over an integration area
:param values: Values as array of shape (N1, N2) with NaN for masked values
:param gsd: Ground sampling distance
:param area: Size of integration area (squared unit of ground sampling distance)
:param perc_min_valid: Minimum valid area in the patch
:param statistics_in_patch: List of statistics to compute in each patch (count is computed by default; only the
first statistic is used by statistic_between_patches)
:param statistic_between_patches: Statistic to compute between all patches, typically a measure of spread, applied
to the first in-patch statistic, which is typically the mean
:param patch_shape: Shape of patch, either "circular" or "square".
:param n_patches: Maximum number of patches to sample
:param random_state: Random state or seed number to use for calculations (to fix random sampling during testing)
:param return_in_patch_statistics: Whether to return the dataframe of statistics for all patches and areas
:return: Statistic between patches, Number of patches, Exact discretized area, Dataframe of per-patch statistics
"""
list_statistics_in_patch = list(statistics_in_patch)
# Add count by default
list_statistics_in_patch.append("count")
# Get statistic name
statistics_name = [f if isinstance(f, str) else f.__name__ for f in list_statistics_in_patch]
# Define random state
rng = np.random.default_rng(random_state)
# Divide raster in quadrants where we can sample
nx, ny = np.shape(values)
kernel_size = int(np.round(np.sqrt(area) / gsd, decimals=0))
# For rectangular quadrants
nx_sub = int(np.floor((nx - 1) / kernel_size))
ny_sub = int(np.floor((ny - 1) / kernel_size))
# For circular patches
rad = int(np.round(np.sqrt(area / np.pi) / gsd, decimals=0))
# Compute exact area to provide to checks and return
if patch_shape.lower() == "square":
nb_pixel_exact = nx_sub * ny_sub
exact_area = nb_pixel_exact * gsd**2
elif patch_shape.lower() == "circular":
nb_pixel_exact = np.count_nonzero(_create_circular_mask(shape=(nx, ny), radius=rad))
exact_area = nb_pixel_exact * gsd**2
# Create list of all possible quadrants
list_quadrant = [[i, j] for i in range(nx_sub) for j in range(ny_sub)]
u = 0
# Keep sampling while there is quadrants left and below maximum number of patch to sample
remaining_nsamp = n_patches
list_df = []
while len(list_quadrant) > 0 and u < n_patches:
# Draw a random coordinate from the list of quadrants, select more than enough random points to avoid drawing
# randomly and differencing lists several times
list_idx_quadrant = rng.choice(len(list_quadrant), size=min(len(list_quadrant), 10 * remaining_nsamp))
for idx_quadrant in list_idx_quadrant:
logging.info("Working on a new quadrant")
# Select center coordinates
i = list_quadrant[idx_quadrant][0]
j = list_quadrant[idx_quadrant][1]
# Get patch by masking the square or circular quadrant
if patch_shape.lower() == "square":
patch = values[
kernel_size * i : kernel_size * (i + 1), kernel_size * j : kernel_size * (j + 1)
].flatten()
elif patch_shape.lower() == "circular":
center_x = np.floor(kernel_size * (i + 1 / 2))
center_y = np.floor(kernel_size * (j + 1 / 2))
mask = _create_circular_mask((nx, ny), center=(center_x, center_y), radius=rad)
patch = values[mask]
else:
raise ValueError("Patch method must be square or circular.")
# Check that the patch is complete and has the minimum number of valid values
nb_pixel_total = len(patch)
nb_pixel_valid = len(patch[np.isfinite(patch)])
if nb_pixel_valid >= np.ceil(perc_min_valid / 100.0 * nb_pixel_total) and nb_pixel_total == nb_pixel_exact:
u = u + 1
if u > n_patches:
break
logging.info("Found valid quadrant " + str(u) + " (maximum: " + str(n_patches) + ")")
df = pd.DataFrame()
df = df.assign(tile=[str(i) + "_" + str(j)])
for j, statistic in enumerate(list_statistics_in_patch):
if isinstance(statistic, str):
if statistic == "count":
df[statistic] = [nb_pixel_valid]
else:
raise ValueError('No other string than "count" are supported for named statistics.')
else:
df[statistics_name[j]] = [statistic(patch[np.isfinite(patch)].astype("float64"))]
list_df.append(df)
# Get remaining samples to draw
remaining_nsamp = n_patches - u
# Remove quadrants already sampled from list
list_quadrant = [c for j, c in enumerate(list_quadrant) if j not in list_idx_quadrant]
if len(list_df) > 0:
df_all = pd.concat(list_df)
# The average statistic is computed on the first in-patch statistic
average_statistic = float(statistic_between_patches(df_all[statistics_name[0]].values))
nb_independent_patches = np.count_nonzero(np.isfinite(df_all[statistics_name[0]].values))
else:
df_all = pd.DataFrame()
for j, _ in enumerate(list_statistics_in_patch):
df_all[statistics_name[j]] = [np.nan]
average_statistic = np.nan
nb_independent_patches = 0
warnings.warn("No valid patch found covering this area size, returning NaN for statistic.")
if return_in_patch_statistics:
return average_statistic, nb_independent_patches, exact_area, df_all
else:
return average_statistic, nb_independent_patches, exact_area
@overload
def patches_method(
values: NDArrayf | RasterType,
areas: list[float],
gsd: float = None,
stable_mask: NDArrayf | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | VectorType | gpd.GeoDataFrame = None,
statistics_in_patch: tuple[Callable[[NDArrayf], np.floating[Any]] | str] = (np.nanmean,),
statistic_between_patches: Callable[[NDArrayf], np.floating[Any]] = nmad,
perc_min_valid: float = 80.0,
patch_shape: str = "circular",
vectorized: bool = True,
convolution_method: str = "scipy",
n_patches: int = 1000,
*,
return_in_patch_statistics: Literal[False] = False,
random_state: int | np.random.Generator | None = None,
) -> pd.DataFrame: ...
@overload
def patches_method(
values: NDArrayf | RasterType,
areas: list[float],
gsd: float = None,
stable_mask: NDArrayf | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | VectorType | gpd.GeoDataFrame = None,
statistics_in_patch: tuple[Callable[[NDArrayf], np.floating[Any]] | str] = (np.nanmean,),
statistic_between_patches: Callable[[NDArrayf], np.floating[Any]] = nmad,
perc_min_valid: float = 80.0,
patch_shape: str = "circular",
vectorized: bool = True,
convolution_method: str = "scipy",
n_patches: int = 1000,
*,
return_in_patch_statistics: Literal[True],
random_state: int | np.random.Generator | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]: ...
[docs]
def patches_method(
values: NDArrayf | RasterType,
areas: list[float],
gsd: float = None,
stable_mask: NDArrayf | VectorType | gpd.GeoDataFrame = None,
unstable_mask: NDArrayf | VectorType | gpd.GeoDataFrame = None,
statistics_in_patch: tuple[Callable[[NDArrayf], np.floating[Any]] | str] = (np.nanmean,),
statistic_between_patches: Callable[[NDArrayf], np.floating[Any]] = nmad,
perc_min_valid: float = 80.0,
patch_shape: str = "circular",
vectorized: bool = True,
convolution_method: str = "scipy",
n_patches: int = 1000,
return_in_patch_statistics: bool = False,
random_state: int | np.random.Generator | None = None,
) -> pd.DataFrame | tuple[pd.DataFrame, pd.DataFrame]:
"""
Monte Carlo patches method that samples multiple patches of terrain, square or circular, of a certain area and
computes a statistic in each patch. Then, another statistic is computed between all patches. Typically, a statistic
of central tendency (e.g., the mean) is computed for each patch, then a statistic of spread (e.g., the NMAD) is
computed on the central tendency of all the patches. This specific procedure gives an empirical estimate of the
standard error of the mean.
The function returns the exact areas of the patches, which might differ from the input due to rasterization of the
shapes.
By default, the fast vectorized method based on a convolution of all pixels is used, but only works with the mean.
To compute other statistics (possibly a list), the non-vectorized method that randomly samples quadrants of the
input array up to a certain number of patches "n_patches" can be used.
The per-patch statistics can be returned as a concatenated dataframe using the "return_in_patch_statistics"
argument, not done by default due to large sizes.
:param values: Values as array or Raster
:param areas: List of patch areas to process (squared unit of ground sampling distance; exact patch areas might not
always match these accurately due to rasterization, and are returned as outputs)
:param gsd: Ground sampling distance
:param stable_mask: Vector shapefile of stable terrain (if values is Raster), or boolean array of same shape as
values
:param unstable_mask: Vector shapefile of unstable terrain (if values is Raster), or boolean array of same shape
as values
:param statistics_in_patch: List of statistics to compute in each patch (count is computed by default;
only mean and count supported for vectorized version)
:param statistic_between_patches: Statistic to compute between all patches, typically a measure of spread, applied
to the first in-patch statistic, which is typically the mean
:param perc_min_valid: Minimum valid area in the patch
:param patch_shape: Shape of patch, either "circular" or "square"
:param vectorized: Whether to use the vectorized (convolution) method or the for loop in quadrants
:param convolution_method: Convolution method to use, either "scipy" or "numba" (only for vectorized)
:param n_patches: Maximum number of patches to sample (only for non-vectorized)
:param return_in_patch_statistics: Whether to return the dataframe of statistics for all patches and areas
:param random_state: Random state or seed number to use for calculations (only for non-vectorized, for testing)
:return: Dataframe of statistic between patches with independent patches count and exact areas,
(Optional) Dataframe of per-patch statistics
"""
# Get values with NaNs on unstable terrain, preserving the shape by default
values_arr, gsd = _preprocess_values_with_mask_to_array(
values=values, include_mask=stable_mask, exclude_mask=unstable_mask, gsd=gsd
)
# Initialize list of dataframe for the statistic on all patches
list_stats = []
list_nb_patches = []
list_exact_areas = []
# Initialize a list to concatenate full dataframes if we want to return them
if return_in_patch_statistics:
list_df = []
# Looping on areas
for area in areas:
# If vectorized, we run the convolution which only supports mean and count statistics
if vectorized:
outputs = _patches_convolution(
values=values_arr,
gsd=gsd,
area=area,
perc_min_valid=perc_min_valid,
patch_shape=patch_shape,
method=convolution_method,
statistic_between_patches=statistic_between_patches,
return_in_patch_statistics=return_in_patch_statistics,
)
# If not, we run the quadrant loop method that supports any statistic
else:
outputs = _patches_loop_quadrants(
values=values_arr,
gsd=gsd,
area=area,
patch_shape=patch_shape,
n_patches=n_patches,
perc_min_valid=perc_min_valid,
statistics_in_patch=statistics_in_patch,
statistic_between_patches=statistic_between_patches,
return_in_patch_statistics=return_in_patch_statistics,
random_state=random_state,
)
list_stats.append(outputs[0])
list_nb_patches.append(outputs[1])
list_exact_areas.append(outputs[2])
if return_in_patch_statistics:
# Here we'd need to write overload for all the patch subfunctions... maybe we can do this more easily with
# the function behaviour, ignoring for now.
df: pd.DataFrame = outputs[3] # type: ignore
df["areas"] = area
df["exact_areas"] = outputs[2]
list_df.append(df)
# Produce final dataframe of statistic between patches per area
df_statistic = pd.DataFrame(
data={
statistic_between_patches.__name__: list_stats,
"nb_indep_patches": list_nb_patches,
"exact_areas": list_exact_areas,
"areas": areas,
}
)
if return_in_patch_statistics:
# Concatenate the complete dataframe
df_tot = pd.concat(list_df)
return df_statistic, df_tot
else:
return df_statistic
[docs]
def plot_variogram(
df: pd.DataFrame,
list_fit_fun: list[Callable[[NDArrayf], NDArrayf]] = None,
list_fit_fun_label: list[str] = None,
ax: matplotlib.axes.Axes = None,
xscale: str = "linear",
xscale_range_split: list[float] = None,
xlabel: str = None,
ylabel: str = None,
xlim: str = None,
ylim: str = None,
out_fname: str = None,
) -> None:
"""
Plot empirical variogram, and optionally also plot one or several model fits.
Input dataframe is expected to be the output of xdem.spatialstats.sample_empirical_variogram.
Input function model is expected to be the output of xdem.spatialstats.fit_sum_model_variogram.
:param df: Empirical variogram, formatted as a dataframe with count (pairwise sample count), lags
(upper bound of spatial lag bin), exp (experimental variance), and err_exp (error on experimental variance)
:param list_fit_fun: List of model function fits
:param list_fit_fun_label: List of model function fits labels
:param ax: Plotting ax to use, creates a new one by default
:param xscale: Scale of X-axis
:param xscale_range_split: List of ranges at which to split the figure
:param xlabel: Label of X-axis
:param ylabel: Label of Y-axis
:param xlim: Limits of X-axis
:param ylim: Limits of Y-axis
:param out_fname: File to save the variogram plot to
:return:
"""
# Create axes if they are not passed
if ax is None:
fig = plt.figure()
ax = plt.subplot(111)
elif isinstance(ax, matplotlib.axes.Axes):
fig = ax.figure
else:
raise ValueError("ax must be a matplotlib.axes.Axes instance or None")
# Check format of input dataframe
expected_values = ["exp", "lags", "count"]
for val in expected_values:
if val not in df.columns.values:
raise ValueError(f'The expected variable "{val}" is not part of the provided dataframe column names.')
# Hide axes for the main subplot (which will be subdivded)
ax.axis("off")
if ylabel is None:
ylabel = r"Variance [$\mu$ $\pm \sigma$]"
if xlabel is None:
xlabel = "Spatial lag (m)"
init_gridsize = [10, 10]
# Create parameters to split x axis into different linear scales
# If there is no split, get parameters for a single subplot
if xscale_range_split is None:
nb_subpanels = 1
if xscale == "log":
xmin = [np.min(df.lags) / 2]
else:
xmin = [0]
xmax = [np.max(df.lags)]
xgridmin = [0]
xgridmax = [init_gridsize[0]]
gridsize = init_gridsize
# Otherwise, derive a list for each subplot
else:
# Add initial zero if not in input
if xscale_range_split[0] != 0:
if xscale == "log":
first_xmin = np.min(df.lags) / 2
else:
first_xmin = 0
xscale_range_split = [first_xmin] + xscale_range_split
# Add maximum distance if not in input
if xscale_range_split[-1] != np.max(df.lags):
xscale_range_split.append(np.max(df.lags))
# Scale grid size by the number of subpanels
nb_subpanels = len(xscale_range_split) - 1
gridsize = init_gridsize.copy()
gridsize[0] *= nb_subpanels
# Create list of parameters to pass to ax/grid objects of subpanels
xmin = []
xmax = []
xgridmin = []
xgridmax = []
for i in range(nb_subpanels):
xmin.append(xscale_range_split[i])
xmax.append(xscale_range_split[i + 1])
xgridmin.append(init_gridsize[0] * i)
xgridmax.append(init_gridsize[0] * (i + 1))
# Need a grid plot to show the sample count and the statistic
grid = plt.GridSpec(gridsize[1], gridsize[0], wspace=0.5, hspace=0.5)
# Loop over each subpanel
for k in range(nb_subpanels):
# First, an axis to plot the sample histogram
ax0 = ax.inset_axes(grid[:3, xgridmin[k] : xgridmax[k]].get_position(fig).bounds)
ax0.set_xscale(xscale)
ax0.set_xticks([])
# Plot the histogram manually with fill_between
interval_var = [0] + list(df.lags)
for i in range(len(df)):
count = df["count"].values[i]
ax0.fill_between(
[interval_var[i], interval_var[i + 1]],
[0] * 2,
[count] * 2,
facecolor=plt.cm.Greys(0.75),
alpha=1,
edgecolor="white",
linewidth=0.5,
)
if k == 0:
ax0.set_ylabel("Sample count")
# Scientific format to avoid undesired additional space on the label side
ax0.ticklabel_format(axis="y", style="sci", scilimits=(0, 0))
else:
ax0.set_yticks([])
# Ignore warnings for log scales
ax0.set_xlim((xmin[k], xmax[k]))
# Now, plot the statistic of the data
ax1 = ax.inset_axes(grid[3:, xgridmin[k] : xgridmax[k]].get_position(fig).bounds)
# Get the lags bin centers
bins_center = np.subtract(df.lags, np.diff([0] + df.lags.tolist()) / 2)
# If all the estimated errors are all NaN (single run), simply plot the empirical variogram
if np.all(np.isnan(df.err_exp)):
ax1.scatter(bins_center, df.exp, label="Empirical variogram", color="blue", marker="x")
# Otherwise, plot the error estimates through multiple runs
else:
ax1.errorbar(bins_center, df.exp, yerr=df.err_exp, label="Empirical variogram (1-sigma std error)", fmt="x")
# If a list of functions is passed, plot the modelled variograms
if list_fit_fun is not None:
for i, fit_fun in enumerate(list_fit_fun):
x = np.linspace(xmin[k], xmax[k], 1000)
y = fit_fun(x)
if list_fit_fun_label is not None:
ax1.plot(x, y, linestyle="dashed", label=list_fit_fun_label[i], zorder=30)
else:
ax1.plot(x, y, linestyle="dashed", color="black", zorder=30)
if list_fit_fun_label is None:
ax1.plot([], [], linestyle="dashed", color="black", label="Model fit")
ax1.set_xscale(xscale)
if nb_subpanels > 1 and k == (nb_subpanels - 1):
ax1.xaxis.set_ticks(np.linspace(xmin[k], xmax[k], 3))
elif nb_subpanels > 1:
ax1.xaxis.set_ticks(np.linspace(xmin[k], xmax[k], 3)[:-1])
if xlim is None:
ax1.set_xlim((xmin[k], xmax[k]))
else:
ax1.set_xlim(xlim)
if ylim is not None:
ax1.set_ylim(ylim)
else:
if np.all(np.isnan(df.err_exp)):
ax1.set_ylim((0, 1.05 * np.nanmax(df.exp)))
else:
ax1.set_ylim((0, np.nanmax(df.exp) + np.nanmean(df.err_exp)))
if k == int(nb_subpanels / 2):
ax1.set_xlabel(xlabel)
if k == nb_subpanels - 1:
ax1.legend(loc="lower right")
if k == 0:
ax1.set_ylabel(ylabel)
else:
ax1.set_yticks([])
if out_fname is not None:
plt.savefig(out_fname)
[docs]
def plot_1d_binning(
df: pd.DataFrame,
var_name: str,
statistic_name: str,
label_var: str | None = None,
label_statistic: str | None = None,
min_count: int = 30,
ax: matplotlib.axes.Axes | None = None,
out_fname: str = None,
) -> None:
"""
Plot a statistic and its count along a single binning variable.
Input is expected to be formatted as the output of the xdem.spatialstats.nd_binning function.
:param df: Output dataframe of nd_binning
:param var_name: Name of binning variable to plot
:param statistic_name: Name of statistic of interest to plot
:param label_var: Label of binning variable
:param label_statistic: Label of statistic of interest
:param min_count: Removes statistic values computed with a count inferior to this minimum value
:param ax: Plotting ax to use, creates a new one by default
:param out_fname: File to save the variogram plot to
"""
# Create axes
if ax is None:
fig = plt.figure()
ax = plt.subplot(111)
elif isinstance(ax, matplotlib.axes.Axes):
fig = ax.figure
else:
raise ValueError("ax must be a matplotlib.axes.Axes instance or None.")
if var_name not in df.columns.values:
raise ValueError(f'The variable "{var_name}" is not part of the provided dataframe column names.')
if statistic_name not in df.columns.values:
raise ValueError(f'The statistic "{statistic_name}" is not part of the provided dataframe column names.')
# Re-format pandas interval if read from CSV as string
if any(isinstance(x, pd.Interval) for x in df[var_name].values):
pass
# Check for any unformatted interval (saving and reading a pd.DataFrame without MultiIndexing transforms
# pd.Interval into strings)
elif any(isinstance(_pandas_str_to_interval(x), pd.Interval) for x in df[var_name].values):
intervalindex_vals = [_pandas_str_to_interval(x) for x in df[var_name].values]
df[var_name] = pd.IntervalIndex(intervalindex_vals)
else:
raise ValueError("The variable columns must be provided as string or pd.Interval values.")
# Hide axes for the main subplot (which will be subdivded)
ax.axis("off")
if label_var is None:
label_var = var_name
if label_statistic is None:
label_statistic = statistic_name
# Subsample to 1D and for the variable of interest
df_sub = df[np.logical_and(df.nd == 1, np.isfinite(pd.IntervalIndex(df[var_name]).mid))].copy()
# Remove statistic calculated in bins with too low count
df_sub.loc[df_sub["count"] < min_count, statistic_name] = np.nan
# Need a grid plot to show the sample count and the statistic
grid = plt.GridSpec(10, 10, wspace=0.5, hspace=0.5)
# First, an axis to plot the sample histogram
ax0 = ax.inset_axes(grid[:3, :].get_position(fig).bounds)
ax0.set_xticks([])
# Plot the histogram manually with fill_between
interval_var = pd.IntervalIndex(df_sub[var_name])
for i in range(len(df_sub)):
count = df_sub["count"].values[i]
ax0.fill_between(
[interval_var[i].left, interval_var[i].right],
[0] * 2,
[count] * 2,
facecolor=plt.cm.Greys(0.75),
alpha=1,
edgecolor="white",
linewidth=0.5,
)
ax0.set_ylabel("Sample count")
# Scientific format to avoid undesired additional space on the label side
ax0.ticklabel_format(axis="y", style="sci", scilimits=(0, 0))
# Try to identify if the count is always the same
# (np.quantile can have a couple undesired effet, so leave an error margin of 2 wrong bins and 5 count difference)
if np.sum(~(np.abs(df_sub["count"].values[0] - df_sub["count"].values) < 5)) <= 2:
ax0.text(
0.5,
0.5,
"Fixed number of\n samples: " + "{:,}".format(int(df_sub["count"].values[0])),
ha="center",
va="center",
fontweight="bold",
transform=ax0.transAxes,
bbox=dict(facecolor="white", alpha=0.8),
)
ax0.set_ylim((0, 1.1 * np.max(df_sub["count"].values)))
ax0.set_xlim((np.min(interval_var.left), np.max(interval_var.right)))
# Now, plot the statistic of the data
ax1 = ax.inset_axes(grid[3:, :].get_position(fig).bounds)
ax1.scatter(interval_var.mid, df_sub[statistic_name], marker="x")
ax1.set_xlabel(label_var)
ax1.set_ylabel(label_statistic)
ax1.set_xlim((np.min(interval_var.left), np.max(interval_var.right)))
if out_fname is not None:
plt.savefig(out_fname)
[docs]
def plot_2d_binning(
df: pd.DataFrame,
var_name_1: str,
var_name_2: str,
statistic_name: str,
label_var_name_1: str | None = None,
label_var_name_2: str | None = None,
label_statistic: str | None = None,
cmap: matplotlib.colors.Colormap = plt.cm.Reds,
min_count: int = 30,
scale_var_1: str = "linear",
scale_var_2: str = "linear",
vmin: np.floating[Any] = None,
vmax: np.floating[Any] = None,
nodata_color: str | tuple[float, float, float, float] = "yellow",
ax: matplotlib.axes.Axes | None = None,
out_fname: str = None,
) -> None:
"""
Plot one statistic and its count along two binning variables.
Input is expected to be formatted as the output of the xdem.spatialstats.nd_binning function.
:param df: Output dataframe of nd_binning
:param var_name_1: Name of first binning variable to plot
:param var_name_2: Name of second binning variable to plot
:param statistic_name: Name of statistic of interest to plot
:param label_var_name_1: Label of first binning variable
:param label_var_name_2: Label of second binning variable
:param label_statistic: Label of statistic of interest
:param cmap: Colormap
:param min_count: Removes statistic values computed with a count inferior to this minimum value
:param scale_var_1: Scale along the axis of the first variable
:param scale_var_2: Scale along the axis of the second variable
:param vmin: Minimum statistic value in colormap range
:param vmax: Maximum statistic value in colormap range
:param nodata_color: Color for no data bins
:param ax: Plotting ax to use, creates a new one by default
:param out_fname: File to save the variogram plot to
"""
# Create axes
if ax is None:
fig = plt.figure(figsize=(8, 6))
ax = plt.subplot(111)
elif isinstance(ax, matplotlib.axes.Axes):
fig = ax.figure
else:
raise ValueError("ax must be a matplotlib.axes.Axes instance or None.")
if var_name_1 not in df.columns.values:
raise ValueError(f'The variable "{var_name_1}" is not part of the provided dataframe column names.')
elif var_name_2 not in df.columns.values:
raise ValueError(f'The variable "{var_name_2}" is not part of the provided dataframe column names.')
if statistic_name not in df.columns.values:
raise ValueError(f'The statistic "{statistic_name}" is not part of the provided dataframe column names.')
# Re-format pandas interval if read from CSV as string
for var in [var_name_1, var_name_2]:
if any(isinstance(x, pd.Interval) for x in df[var].values):
pass
# Check for any unformatted interval (saving and reading a pd.DataFrame without MultiIndexing transforms
# pd.Interval into strings)
elif any(isinstance(_pandas_str_to_interval(x), pd.Interval) for x in df[var].values):
intervalindex_vals = [_pandas_str_to_interval(x) for x in df[var].values]
df[var] = pd.IntervalIndex(intervalindex_vals)
else:
raise ValueError("The variable columns must be provided as string or pd.Interval values.")
# Hide axes for the main subplot (which will be subdivded)
ax.axis("off")
# Subsample to 2D and for the variables of interest
df_sub = df[
np.logical_and.reduce(
(
df.nd == 2,
np.isfinite(pd.IntervalIndex(df[var_name_1]).mid),
np.isfinite(pd.IntervalIndex(df[var_name_2]).mid),
)
)
].copy()
# Remove statistic calculated in bins with too low count
df_sub.loc[df_sub["count"] < min_count, statistic_name] = np.nan
# Let's do a 4 panel figure:
# two histograms for the binning variables
# + a colored grid to display the statistic calculated on the value of interest
# + a legend panel with statistic colormap and nodata color
# For some reason the scientific notation displays weirdly for default figure size
grid = plt.GridSpec(10, 10, wspace=0.5, hspace=0.5)
# First, an horizontal axis on top to plot the sample histogram of the first variable
ax0 = ax.inset_axes(grid[:3, :-3].get_position(fig).bounds)
ax0.set_xscale(scale_var_1)
ax0.set_xticklabels([])
# Plot the histogram manually with fill_between
interval_var_1 = pd.IntervalIndex(df_sub[var_name_1])
df_sub["var1_mid"] = interval_var_1.mid.values
unique_var_1 = np.unique(df_sub.var1_mid)
list_counts = []
for i in range(len(unique_var_1)):
df_var1 = df_sub[df_sub.var1_mid == unique_var_1[i]]
count = np.nansum(df_var1["count"].values)
list_counts.append(count)
ax0.fill_between(
[df_var1[var_name_1].values[0].left, df_var1[var_name_1].values[0].right],
[0] * 2,
[count] * 2,
facecolor=plt.cm.Greys(0.75),
alpha=1,
edgecolor="white",
linewidth=0.5,
)
ax0.set_ylabel("Sample count")
# In case the axis value does not agree with the scale (e.g., 0 for log scale)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ax0.set_ylim((0, 1.1 * np.max(list_counts)))
ax0.set_xlim((np.min(interval_var_1.left), np.max(interval_var_1.right)))
ax0.ticklabel_format(axis="y", style="sci", scilimits=(0, 0))
ax0.spines["top"].set_visible(False)
ax0.spines["right"].set_visible(False)
# Try to identify if the count is always the same
if np.sum(~(np.abs(list_counts[0] - np.array(list_counts)) < 5)) <= 2:
ax0.text(
0.5,
0.5,
"Fixed number of\nsamples: " + f"{int(list_counts[0]):,}",
ha="center",
va="center",
fontweight="bold",
transform=ax0.transAxes,
bbox=dict(facecolor="white", alpha=0.8),
)
# Second, a vertical axis on the right to plot the sample histogram of the second variable
ax1 = ax.inset_axes(grid[3:, -3:].get_position(fig).bounds)
ax1.set_yscale(scale_var_2)
ax1.set_yticklabels([])
# Plot the histogram manually with fill_between
interval_var_2 = pd.IntervalIndex(df_sub[var_name_2])
df_sub["var2_mid"] = interval_var_2.mid.values
unique_var_2 = np.unique(df_sub.var2_mid)
list_counts = []
for i in range(len(unique_var_2)):
df_var2 = df_sub[df_sub.var2_mid == unique_var_2[i]]
count = np.nansum(df_var2["count"].values)
list_counts.append(count)
ax1.fill_between(
[0, count],
[df_var2[var_name_2].values[0].left] * 2,
[df_var2[var_name_2].values[0].right] * 2,
facecolor=plt.cm.Greys(0.75),
alpha=1,
edgecolor="white",
linewidth=0.5,
)
ax1.set_xlabel("Sample count")
# In case the axis value does not agree with the scale (e.g., 0 for log scale)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ax1.set_xlim((0, 1.1 * np.max(list_counts)))
ax1.set_ylim((np.min(interval_var_2.left), np.max(interval_var_2.right)))
ax1.ticklabel_format(axis="x", style="sci", scilimits=(0, 0))
ax1.spines["top"].set_visible(False)
ax1.spines["right"].set_visible(False)
# Try to identify if the count is always the same
if np.sum(~(np.abs(list_counts[0] - np.array(list_counts)) < 5)) <= 2:
ax1.text(
0.5,
0.5,
"Fixed number of\nsamples: " + f"{int(list_counts[0]):,}",
ha="center",
va="center",
fontweight="bold",
transform=ax1.transAxes,
rotation=90,
bbox=dict(facecolor="white", alpha=0.8),
)
# Third, an axis to plot the data as a colored grid
ax2 = ax.inset_axes(grid[3:, :-3].get_position(fig).bounds)
# Define limits of colormap is none are provided, robust max and min using percentiles
if vmin is None and vmax is None:
vmax = np.nanpercentile(df_sub[statistic_name].values, 99)
vmin = np.nanpercentile(df_sub[statistic_name].values, 1)
# Create custom colormap
col_bounds = np.array([vmin, np.mean(np.asarray([vmin, vmax])), vmax])
cb = []
cb_val = np.linspace(0, 1, len(col_bounds))
for j in range(len(cb_val)):
cb.append(cmap(cb_val[j]))
cmap_cus = colors.LinearSegmentedColormap.from_list(
"my_cb", list(zip((col_bounds - min(col_bounds)) / (max(col_bounds - min(col_bounds))), cb)), N=1000
)
# Plot a 2D colored grid using fill_between
for i in range(len(unique_var_1)):
for j in range(len(unique_var_2)):
df_both = df_sub[np.logical_and(df_sub.var1_mid == unique_var_1[i], df_sub.var2_mid == unique_var_2[j])]
stat = df_both[statistic_name].values[0]
if np.isfinite(stat):
stat_col = max(0.0001, min(0.9999, (stat - min(col_bounds)) / (max(col_bounds) - min(col_bounds))))
col = cmap_cus(stat_col)
else:
col = nodata_color
ax2.fill_between(
[df_both[var_name_1].values[0].left, df_both[var_name_1].values[0].right],
[df_both[var_name_2].values[0].left] * 2,
[df_both[var_name_2].values[0].right] * 2,
facecolor=col,
alpha=1,
edgecolor="white",
)
ax2.set_xlabel(label_var_name_1)
ax2.set_ylabel(label_var_name_2)
ax2.set_xscale(scale_var_1)
ax2.set_yscale(scale_var_2)
# In case the axis value does not agree with the scale (e.g., 0 for log scale)
with warnings.catch_warnings():
warnings.simplefilter("ignore")
ax2.set_xlim((np.min(interval_var_1.left), np.max(interval_var_1.right)))
ax2.set_ylim((np.min(interval_var_2.left), np.max(interval_var_2.right)))
# Fourth and finally, add a colormap and nodata color to the legend
axcmap = ax.inset_axes(grid[:3, -3:].get_position(fig).bounds)
# Remove ticks, labels, frame
axcmap.set_xticks([])
axcmap.set_yticks([])
axcmap.spines["top"].set_visible(False)
axcmap.spines["left"].set_visible(False)
axcmap.spines["right"].set_visible(False)
axcmap.spines["bottom"].set_visible(False)
# Create an inset axis to manage the scale of the colormap
cbaxes = axcmap.inset_axes([0, 0.75, 1, 0.2], label="cmap")
# Create colormap object and plot
norm = colors.Normalize(vmin=min(col_bounds), vmax=max(col_bounds))
sm = plt.cm.ScalarMappable(cmap=cmap_cus, norm=norm)
sm.set_array([])
cb = plt.colorbar(sm, cax=cbaxes, orientation="horizontal", extend="both", shrink=0.8)
cb.ax.tick_params(width=0.5, length=2)
cb.set_label(label_statistic)
# Create an inset axis to manage the scale of the nodata legend
nodata = axcmap.inset_axes([0.4, 0.1, 0.2, 0.2], label="nodata")
# Plot a nodata legend
nodata.fill_between([0, 1], [0, 0], [1, 1], facecolor=nodata_color)
nodata.set_xlim((0, 1))
nodata.set_ylim((0, 1))
nodata.set_xticks([])
nodata.set_yticks([])
nodata.text(0.5, -0.25, "No data", ha="center", va="top")
if out_fname is not None:
plt.savefig(out_fname)