# !/usr/bin/env python3
#
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# Author: Markus Ritschel
# eMail: git@markusritschel.de
# Date: 2024-03-03
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#
import logging
from collections import namedtuple
import numpy as np
import pandas as pd
import xarray as xr
import xarrayutils
log = logging.getLogger(__name__)
[docs]
def weighted_annual_mean(ds: xr.Dataset | xr.DataArray):
"""
Compute the weighted annual mean of an :class:`xarray.Dataset` or :class:`xarray.DataArray`.
Parameters
----------
ds :
The input dataset or data array.
Returns
-------
xarray.DataArray
The weighted annual mean of the input dataset or data array.
Raises
------
AssertionError
If the sum of the weights in each year is not equal to 1.0.
Notes
-----
The function computes the annual mean of the input dataset or data array, taking into account the different lengths
of the months. Each month is weighted by the number of days it comprises. If the frequency of the time dimension is
'1M', the function applies the weights. If the frequency is '1D' or higher, no weights are applied.
The function follows the approach described in the following source:
https://ncar.github.io/esds/posts/2021/yearly-averages-xarray/
The function assumes that the input dataset or data array has a 'time' dimension.
"""
def check_for_frequency(ds):
try:
estimated_frequency = xr.infer_freq(ds.time)
if not estimated_frequency.startswith("M"):
log.warning(
"Frequency seems to be not monthly. Consider another averaging method."
)
except:
log.warning("Cannot infer frequency")
return
check_for_frequency(ds)
# Determine the month length
month_length = ds.time.dt.days_in_month
# Calculate the weights
# In each 4th year, the total amount of days differs compared to other years
# Therefore, weights need to be calculated on an annual base
weights = (
month_length.groupby("time.year") / month_length.groupby("time.year").sum()
)
# Make sure the weights in each year add up to 1
assert np.allclose(weights.groupby("time.year").sum(xr.ALL_DIMS), 1.0), (
"The sum of the weights should be 1.0!"
)
# Setup our masking for nan values
ones = xr.where(ds.isnull(), 0.0, 1.0)
# Calculate the annual values, weighted by days in each month
ds_sum = (ds * weights).resample(time="YS").sum(dim="time", keep_attrs=True)
# Calculate the NaN weights
# This gives every NaN in the original data a weight of zero, resulting in a lower
# weight for affected years
ones_out = (ones * weights).resample(time="YS").sum(dim="time", keep_attrs=True)
# Return the weighted average
output = ds_sum / ones_out
output = output.assign_coords(
year=("time", output.time.dt.year.values), keep_attrs=True
)
return output.swap_dims({"time": "year"}).drop_vars("time")
[docs]
def xr_deseasonalize(da, freq=12, dim="time"):
"""Remove the seasonal cycle of an :class:`xr.Dataset` object.
Data get first detrended, then the long-term average of every season is subtracted for
each season. Finally, the trend is added again.
Parameters
----------
freq : int
The frequency of the data. Default is 12 for monthly resolution.
dim : str
The name of the time dimension.
"""
res = xarrayutils.linear_trend(da, dim=dim)
time_index = xr.DataArray(
np.arange(da[dim].size), dims={dim: da[dim]}, coords={dim: da[dim]}
)
trend = res.intercept + time_index * res.slope
detrended = da - trend
deseasonalized_detrended = (
detrended.groupby(f"{dim}.month") - detrended.groupby(f"{dim}.month").mean()
)
return deseasonalized_detrended + trend
[docs]
def xr_seasonal_decompose(da, dim="time"):
"""
Perform seasonal decomposition of a time series using the given dataset.
Parameters
----------
da: xarray.DataArray
The input data array containing the time series.
dim: str
The dimension along which the decomposition is performed. Default is 'time'.
Returns
-------
xarray.Dataset:
A new dataset containing the decomposed components: trend, detrended, seasonality, residuals, and deseasonalized.
"""
assert isinstance(da, xr.DataArray), "Input should be xarray.DataArray"
res = xarrayutils.linear_trend(da, dim=dim)
time_index = xr.DataArray(
np.arange(da[dim].size), dims={dim: da[dim]}, coords={dim: da[dim]}
)
trend = res.intercept + time_index * res.slope
detrended = da - trend
seasonality = detrended.groupby(f"{dim}.month").mean()
residuals = detrended.groupby(f"{dim}.month") - seasonality
deseasonalized = residuals + trend
# Create a new dataset to store the results
result = xr.Dataset()
result["trend"] = trend
result["detrended"] = detrended
result["seasonality"] = seasonality
result["residuals"] = residuals
result["deseasonalized"] = deseasonalized
return result
[docs]
def pd_seasonal_decompose(x, freq=12):
"""Decompose a time series into its trend, the seasonality, and the residuals.
Parameters
----------
x: pandas.Series
A :class:`pandas.Series` containing a time series of data
freq : int
The frequency of the data, e.g. 12 for monthly data
Returns
-------
A :class:`pandas.DataFrame` containing time series of the raw data, trend,
seasonality, the detrended time series, and the residuals.
"""
assert isinstance(x, pd.Series), "The input should be a pandas.Series"
df = x.to_frame("raw")
# calculate the trend component
# TODO: decide if this should be calculated based on a running mean or the linear trend
df["trend"] = df["raw"].rolling(window=freq + 1, center=True).mean()
# detrend the series
df["detrended"] = df["raw"] - df["trend"]
# calculate the seasonal component
df.index = pd.to_datetime(df.index)
df["seasonality"] = df.groupby(df.index.month)["detrended"].transform("mean")
# get the residuals
df["residuals"] = df["detrended"] - df["seasonality"]
return df[["raw", "trend", "seasonality", "detrended", "residuals"]]
[docs]
def extend_annual_series(ds):
"""
Fill a time series with only annual values (one such timeseries could be generated
via :func:`weighted_annual_mean`, for example) such that all months are represented again but the value
for all 12 months within a year is equal to the annual value.
Parameters
----------
ds : xarray.Dataset
The input dataset containing the time series data.
Returns
-------
xarray.Dataset
The extended time series dataset with monthly values.
Raises
------
AssertionError
If the dataset does not have 'year' as a dimension.
Example
-------
>>> ds = xr.Dataset({'time': pd.date_range('2000-01-01', '2001-12-31', freq='ME'),
... 'value': np.random.rand(24)})
>>> extended_ds = extend_annual_series(ds)
"""
if "year" not in ds.dims:
ds = (ds.assign_coords(year=("time", ds.time.dt.year.values))
.swap_dims({"time": "year"})).drop_vars(["time"])
assert "year" in ds.dims, "Dataset needs to have `year` as dimension"
ds_monthly = ds.expand_dims(month=np.arange(1, 13))
ds_stacked = ds_monthly.stack(year_month=("year", "month"))
# Convert `year` coordinate back to `time`
_datetime = pd.to_datetime(
[f"{y}-{m}" for y in ds_monthly.year.values for m in np.arange(1, 13)]
)
ds_stacked = ds_stacked.assign_coords(time=("year_month", _datetime))
ds_stacked = ds_stacked.swap_dims({"year_month": "time"})
ds_stacked = ds_stacked.drop_vars(["year", "month", "year_month"])
return ds_stacked
[docs]
def zero_crossings(x):
"""Find the zero crossings of a time series.
Example
-------
>>> x = np.array([1, 2, -1, -2, 1, 2]) # crossing at 2 -> -1 and -2 -> 1
>>> zero_crossings(x)
array([1, 3])
"""
return np.where(np.diff(np.sign(x)))[0]
def _mask_after_threshold_crossing(arr, threshold=0.0):
"""Mask values after the signal first drops below *threshold*."""
below = arr < threshold
if not np.any(below):
return arr
first_idx = np.argmax(below)
out = arr.copy()
out[first_idx:] = np.nan
return out
def _mask_after_first_zero_crossing(x):
return _mask_after_threshold_crossing(x, threshold=0.0)
[docs]
def xr_autocorr(x, dim="time", normalize=True, new_dim="lead"):
"""Calculate the autocorrelation of a time series.
Parameters
----------
x : xarray.DataArray
The input data array containing the time series.
dim : str, optional
The dimension along which to calculate the autocorrelation. Defaults to 'time'.
normalize : bool, optional
Whether to normalize the autocorrelation. Defaults to True.
new_dim : str, optional
The name of the new dimension. Defaults to 'lead'.
Returns
-------
xarray.DataArray
The autocorrelation of the time series.
"""
from scipy import signal
corr = xr.apply_ufunc(
signal.correlate,
x,
x,
kwargs=dict(mode="same"),
input_core_dims=[[dim], [dim]],
output_core_dims=[[dim]],
vectorize=True,
dask="parallelized",
output_dtypes=[x.dtype],
keep_attrs=False,
)
if normalize:
corr /= corr.max(dim=dim)
corr = corr.rename({dim: new_dim})
nlags = len(x[dim]) // 2
corr[new_dim] = np.arange(-nlags, nlags)
return corr
[docs]
def integral_timescale(data, dt=1):
"""
Calculate the integral timescale of decorrelation of a time series.
Parameters
----------
data : np.ndarray
The input data array containing the time series.
Must be NaN-free; pass pre-cleaned data to avoid
destroying temporal structure.
dt : float
The time step of the data.
Returns
-------
float
The integral timescale of the time series.
Raises
------
ValueError
If the input data contains NaN values.
Notes
-----
The integral timescale is calculated as the integral of the
autocorrelation function (ACF) of the time series up to the
first zero crossing.
"""
from scipy.signal import correlate
if np.any(np.isnan(data)):
raise ValueError(
"Input data contains NaN values. Remove NaNs before calling this function."
)
data = data - data.mean()
# Calculate autocorrelation, use only 2nd half (non-negative lags), and normalize
autocorr = correlate(data, data, mode="full")
autocorr = autocorr[autocorr.size // 2 :]
autocorr = autocorr / autocorr.max()
autocorr = _mask_after_first_zero_crossing(autocorr)
autocorr = np.nan_to_num(autocorr)
τ = np.trapezoid(autocorr, dx=dt)
return τ
# alias
decorrelation_timescale = integral_timescale
[docs]
def ndof_integral_timescale(data, dt=1):
"""
Calculate the number of degrees of freedom (dof) of the integral
timescale of decorrelation of a time series.
Parameters
----------
data : np.ndarray
The input data array containing the time series.
dt : float
The time step of the data.
Returns
-------
IntegralTimescaleResult
Named tuple with fields ``timescale`` (the integral timescale)
and ``dof`` (the degrees of freedom).
Notes
-----
The effective sample size is calculated as ``n * dt / τ`` following
Emery & Thomson (2004), eq. (3.15.17). It is clamped to ``[2, n]``
to ensure valid degrees of freedom.
"""
data = data[~np.isnan(data)]
data = data - data.mean()
τ = integral_timescale(data, dt)
m = len(data)
n_eff_raw = m * dt / τ if τ > 0 else float(m)
n_eff = np.clip(n_eff_raw, 2, m)
if n_eff != n_eff_raw:
log.warning(
"Effective sample size clamped to [2, %d] (raw n_eff=%.2f)", m, n_eff_raw
)
dof = n_eff - 2
return _IntegralTimescaleResult(τ, dof)
_IntegralTimescaleResult = namedtuple("IntegralTimescaleResult", ["timescale", "dof"])
[docs]
def lag1_autocorrelation(x):
"""Calculate the lag-1 autocorrelation of a time series."""
if len(x) < 3:
raise ValueError(
f"Need at least 3 data points for lag-1 autocorrelation, got {len(x)}."
)
return np.corrcoef(x[:-1], x[1:])[0, 1]
[docs]
def effective_sample_size(x, y):
"""
Calculate the effective sample size of two time series based on the lag-1 autocorrelation.
Parameters
----------
x : np.ndarray
The first time series data.
y : np.ndarray
The second time series data.
Returns
-------
float
The effective sample size of the two time series.
"""
r1 = lag1_autocorrelation(x)
r2 = lag1_autocorrelation(y)
n = x.size
denom = 1 + r1 * r2
if np.isclose(denom, 0):
log.warning(
"r1*r2 ≈ -1 (denom=%.4f); effective sample size "
"is ill-defined, clamping to n=%d",
denom,
n,
)
return float(n)
n_eff_raw = n * (1 - r1 * r2) / denom
# Following Bretherton et al. (1999), eq. 31
# DOI: 10.1175/1520-0442(1999)012<1990:TENOSD>2.0.CO;2
n_eff = np.clip(n_eff_raw, 2, n)
if n_eff != n_eff_raw:
log.warning(
"Effective sample size clamped to [2, %d] (raw n_eff=%.2f)", n, n_eff_raw
)
return n_eff
[docs]
def ndof_lag1_autocorrelation(x, y):
"""
Calculate the number of degrees of freedom (ndof) based on the lag-1 autocorrelation
of two time series.
Parameters
----------
x: 1-D array-like
The first time series data.
y: 1-D array-like
The second time series data.
Returns
-------
dof: float
The number of degrees of freedom.
"""
n_eff = effective_sample_size(x, y)
dof = n_eff - 2
return dof