Source code for my_code_base.stats.xarray_utils

# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# Author: Markus Ritschel
# eMail:  git@markusritschel.de
# Date:   2024-06-19
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#
import logging

import numpy as np
import pandas as pd
import xarray as xr
from multipledispatch import dispatch
from scipy import stats

from .timeseries import extend_annual_series, weighted_annual_mean, xr_deseasonalize

log = logging.getLogger(__name__)


def _has_seasonal_frequency(obj, dim="time"):
    """Check if data has sub-annual temporal resolution."""
    time = obj[dim]
    if not np.issubdtype(time.dtype, np.datetime64):
        return False
    try:
        freq = pd.infer_freq(time.values)
    except ValueError:
        freq = None
    if freq is None:
        # Fallback: check median spacing
        dt = np.diff(time.values).astype("timedelta64[D]").astype(float)
        median_days = np.median(dt)
        return median_days < 360
    return not freq.startswith(("A", "Y", "AS", "YS"))


[docs] @xr.register_dataset_accessor("stats") @xr.register_dataarray_accessor("stats") class StatsAccessor: """ This class provides statistical operations on xarray data. Parameters ---------- obj : xarray.Dataset or xarray.DataArray The xarray object on which statistical operations will be performed. Methods ------- weighted_mean(dim) Calculate the weighted mean based on the given dimension. """ def __init__(self, obj): self._obj = obj @dispatch(str) def weighted_mean(self, dim): """ Calculate the weighted annual mean (taking days of months into account). Parameters ---------- dim : str The dimension along which to calculate the weighted mean. Returns ------- weighted_mean : xarray.Dataset or xarray.DataArray The weighted annual mean. """ log.debug(f"Identified single dimension {dim}. Building weighted annual mean.") return weighted_annual_mean(self._obj) def _is_annual(self): """ Check if the xarray object has a 'year' dimension. Returns ------- bool True if the xarray object has a 'year' dimension, False otherwise. """ return 'year' in self._obj.dims
[docs] def fill_months_with_annual_value(self): """ Fill the xarray object with annual values for each month. Returns ------- xarray.Dataset or xarray.DataArray The xarray object with extended annual series. Raises ------ ValueError If the time series is not of yearly frequency. """ ds = self._obj if not self._is_annual(): raise ValueError("Time series is not of yearly frequency.") return extend_annual_series(ds)
[docs] def xr_linregress(x, y, dim='time', dof=None, deseasonalize: bool = True): """ Calculate linear regression statistics between two :class:`xarray.DataArray` along a specified dimension. Parameters ---------- x : xarray.DataArray The independent variable. y : xarray.DataArray The dependent variable. dim : str, optional The dimension along which to perform the regression. Defaults to 'time'. dof : int, str, or tuple, optional The degrees of freedom for the t-distribution. If None, it is calculated as n - 2, where n is the sample size. If 'integral_timescale', the integral timescale is calculated and used to determine the degrees of freedom. If 'effective_sample_size', the effective sample size is calculated and used to determine the degrees of freedom. Can also be a tuple ``('integral_timescale', '1/e')`` to use the 1/e decay threshold instead of the first zero-crossing when computing the integral timescale. Defaults to None. Returns ------- xarray.Dataset: A dataset containing the following regression statistics: - 'sample_size': The number of non-null values along the specified dimension. - 'slope': The slope of the regression line. - 'intercept': The intercept of the regression line. - 'r_value': The correlation coefficient between x and y. - 'p_value': The two-tailed p-value for the null hypothesis that the slope is zero. - 'std_err': The standard error of the slope. Notes ----- - NaN values are automatically excluded from the calculations. - The correlation coefficient, p-value, and standard error are calculated using the t-distribution. - If dof is 'integral_timescale', the integral timescale is calculated as the sum of autocorrelation values until the first zero-crossing. The effective sample size is then calculated as the total sample size divided by the integral timescale. - If dof is 'effective_sample_size', the effective sample size is calculated as n * (1 - r1*r2) / (1 + r1*r2), where r1 and r2 are the lag-1 autocorrelation coefficients of x and y, respectively. """ from ..stats.timeseries import _mask_after_threshold_crossing, xr_autocorr TINY = 1.0e-20 # n = x[dim].size x = x.where(y.notnull()) y = y.where(x.notnull()) # COMMENT: If there are NaNs, the `n = x[dim].size` overstates the sample size, which deflates covariance/variance estimates (dividing by too-large N) and inflates DOF. # → Using notnull().sum(dim) for correctness. See Issue #16 n = y.notnull().sum(dim) nanmask = np.isnan(y).all(dim) xmean = x.mean(dim) ymean = y.mean(dim) xstd = x.std(dim) ystd = y.std(dim) cov = ((x - xmean) * (y - ymean)).sum(dim) / n cor = cov / (xstd * ystd) slope = cov / (xstd**2) intercept = ymean - xmean * slope out_dict = {'sample_size': n} # Parse tuple-based dof parameter, e.g. dof=('integral_timescale', '1/e') integral_cutoff = "zero_crossing" if isinstance(dof, tuple): dof, integral_cutoff = dof elif isinstance(dof, int): dof = dof - 2 if dof is None: dof = n - 2 elif dof == "integral_timescale": # COMMENT: Seasonal autocorrelation inflates τ, making n_eff artificially small (overly conservative). # → Deseasonalize, but only if the data has sub-annual frequency. See Issue #17 if deseasonalize and _has_seasonal_frequency(x, dim=dim): x = xr_deseasonalize(x) x_autocorr = xr_autocorr(x, dim=dim, normalize=True) positive_r = x_autocorr.sel(lead=slice(0, None)) threshold = 1 / np.e if integral_cutoff == "1/e" else 0.0 positive_r_masked = xr.apply_ufunc( _mask_after_threshold_crossing, positive_r, kwargs={"threshold": threshold}, input_core_dims=[['lead']], output_core_dims=[['lead']], dask="parallelized", vectorize=True, output_dtypes=['float'], ) τ = positive_r_masked.fillna(0).integrate(coord="lead") log.debug(f"Integral timescale: {τ}") n_eff = n / τ # Following eq. (3.15.17) in Emery & Thomson (2004) n_eff = n_eff.where(n_eff < n, n) # allow a maximum of n for n_eff dof = n_eff - 2 out_dict.update( { "autocorr": x_autocorr, "integral_timescale": τ, "effective_sample_size": n_eff, } ) elif dof == "effective_sample_size": # See Issue #16 if deseasonalize and _has_seasonal_frequency(x, dim=dim): x = xr_deseasonalize(x) y = xr_deseasonalize(y) r1 = xr_autocorr(x, dim=dim, normalize=True).sel(lead=1) r2 = xr_autocorr(y, dim=dim, normalize=True).sel(lead=1) n_eff = n * (1 - r1 * r2) / (1 + r1 * r2) dof = n_eff - 2 out_dict.update({'effective_sample_size': n_eff}) out_dict.update({'dof': dof}) log.info(f"Degrees of freedom: {dof}") tstats = cor * np.sqrt(dof / ((1.0 - cor + TINY) * (1.0 + cor + TINY))) stderr = slope / tstats # Issue #15: Should we scale with σ? pval = ( xr.apply_ufunc( stats.distributions.t.sf, # sf is the survival function (1 - cdf) abs(tstats), dof, dask="parallelized", output_dtypes=[y.dtype], ) * 2 # Convert pval to a two-tailed test ) out_dict.update( { "slope": slope, "intercept": intercept, "r_value": cor.fillna(0).where(~nanmask), "p_value": pval, "std_err": stderr.where(~np.isinf(stderr), 0), } ) ds = xr.Dataset(out_dict) _metadata = { "sample_size": {"long_name": "Number of valid observations"}, "dof": {"long_name": "Degrees of freedom"}, "slope": {"long_name": "Regression slope", "units": "[y] / [x]"}, "intercept": {"long_name": "Regression intercept", "units": "[y]"}, "r_value": {"long_name": "Pearson correlation coefficient"}, "p_value": {"long_name": "Two-tailed p-value"}, "std_err": {"long_name": "Standard error of the slope", "units": "[y] / [x]"}, "autocorr": {"long_name": "Autocorrelation function"}, "integral_timescale": {"long_name": "Integral timescale", "units": "[x]"}, "effective_sample_size": {"long_name": "Effective sample size"}, } for var, attrs in _metadata.items(): if var in ds: ds[var].attrs.update(attrs) return ds