Source code for my_code_base.stats.timeseries

# !/usr/bin/env python3
#
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
# Author: Markus Ritschel
# eMail:  git@markusritschel.de
# Date:   2024-03-03
# %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
#
import logging
import numpy as np
import pandas as pd
import xarray as xr
import xarrayutils

log = logging.getLogger(__name__)


[docs] def weighted_annual_mean(ds: xr.Dataset | xr.DataArray): """ Compute the weighted annual mean of an :class:`xarray.Dataset` or :class:`xarray.DataArray`. Parameters --------- ds : The input dataset or data array. Returns ------- xarray.DataArray The weighted annual mean of the input dataset or data array. Raises ------ AssertionError If the sum of the weights in each year is not equal to 1.0. Notes ----- The function computes the annual mean of the input dataset or data array, taking into account the different lengths of the months. Each month is weighted by the number of days it comprises. If the frequency of the time dimension is '1M', the function applies the weights. If the frequency is '1D' or higher, no weights are applied. The function follows the approach described in the following source: https://ncar.github.io/esds/posts/2021/yearly-averages-xarray/ The function assumes that the input dataset or data array has a 'time' dimension. """ def check_for_frequency(ds): try: estimated_frequency = xr.infer_freq(ds.time) if not estimated_frequency.startswith('M'): log.warning("Frequency seems to be not monthly. Consider another averaging method.") except: log.warning("Cannot infer frequency") return check_for_frequency(ds) # Determine the month length month_length = ds.time.dt.days_in_month # Calculate the weights # In each 4th year, the total amount of days differs compared to other years # Therefore, weights need to be calculated on an annual base weights = month_length.groupby("time.year") / month_length.groupby("time.year").sum() # Make sure the weights in each year add up to 1 assert np.allclose(weights.groupby("time.year").sum(xr.ALL_DIMS), 1.0), \ "The sum of the weights should be 1.0!" # Setup our masking for nan values ones = xr.where(ds.isnull(), 0.0, 1.0) # Calculate the annual values, weighted by days in each month ds_sum = (ds * weights).resample(time="YS").sum(dim="time", keep_attrs=True) # Calculate the NaN weights # This gives every NaN in the original data a weight of zero, resulting in a lower # weight for affected years ones_out = (ones * weights).resample(time="YS").sum(dim="time", keep_attrs=True) # Return the weighted average output = ds_sum / ones_out output = output.assign_coords(year=('time', output.time.dt.year.values), keep_attrs=True) return output.swap_dims({'time': 'year'}).drop_vars('time')
[docs] def xr_deseasonalize(da, freq=12, dim='time'): """Remove the seasonal cycle of an :class:`xr.Dataset` object. Data get first detrended, then the long-term average of every season is subtracted for each season. Finally, the trend is added again. Parameters ---------- freq : int The frequency of the data. Default is 12 for monthly resolution. dim : str The name of the time dimension. """ res = xarrayutils.linear_trend(da, dim=dim) time_index = xr.DataArray(np.arange(da[dim].size), dims={dim:da[dim]}, coords={dim:da[dim]}) trend = res.intercept + time_index*res.slope detrended = da - trend deseasonalized_detrended = ( detrended.groupby(f'{dim}.month') - detrended.groupby(f'{dim}.month').mean() ) return deseasonalized_detrended + trend
[docs] def xr_seasonal_decompose(da, dim='time'): """ Perform seasonal decomposition of a time series using the given dataset. Parameters ---------- da: xarray.DataArray The input data array containing the time series. dim: str The dimension along which the decomposition is performed. Default is 'time'. Returns ------- xarray.Dataset: A new dataset containing the decomposed components: trend, detrended, seasonality, residuals, and deseasonalized. """ assert isinstance(da, xr.DataArray), "Input should be xarray.DataArray" res = xarrayutils.linear_trend(da, dim=dim) time_index = xr.DataArray(np.arange(da[dim].size), dims={dim:da[dim]}, coords={dim:da[dim]}) trend = res.intercept + time_index*res.slope detrended = da - trend seasonality = detrended.groupby(f'{dim}.month').mean() residuals = detrended.groupby(f'{dim}.month') - seasonality deseasonalized = residuals + trend # Create a new dataset to store the results result = xr.Dataset() result['trend'] = trend result['detrended'] = detrended result['seasonality'] = seasonality result['residuals'] = residuals result['deseasonalized'] = deseasonalized return result
[docs] def pd_seasonal_decompose(x, freq=12): """Decompose a time series into its trend, the seasonality, and the residuals. Parameters ---------- x: pandas.Series A :class:`pandas.Series` containing a time series of data freq : int The frequency of the data, e.g. 12 for monthly data Returns ------- A :class:`pandas.DataFrame` containing time series of the raw data, trend, seasonality, the detrended time series, and the residuals. """ assert isinstance(x, pd.Series), "The input should be a pandas.Series" df = x.to_frame("raw") # calculate the trend component # TODO: decide if this should be calculated based on a running mean or the linear trend df["trend"] = df["raw"].rolling(window=freq+1, center=True).mean() # detrend the series df["detrended"] = df["raw"] - df["trend"] # calculate the seasonal component df.index = pd.to_datetime(df.index) df["seasonality"] = df.groupby(df.index.month)["detrended"].transform("mean") # get the residuals df["residuals"] = df["detrended"] - df["seasonality"] return df[['raw', 'trend', 'seasonality', 'detrended', 'residuals']]
[docs] def extend_annual_series(ds): """ Fill a time series with only annual values (one such timeseries could be generated via :func:`weighted_annual_mean`, for example) such that all months are represented again but the value for all 12 months within a year is equal to the annual value. Parameters ---------- ds : xarray.Dataset The input dataset containing the time series data. Returns ------- xarray.Dataset The extended time series dataset with monthly values. Raises ------ AssertionError If the dataset does not have 'year' as a dimension. Example ------- >>> ds = xr.Dataset({'time': pd.date_range('2000-01-01', '2001-12-31', freq='M'), ... 'value': np.random.rand(24)}) >>> extended_ds = extend_annual_series(ds) """ if 'year' not in ds.dims: ds = (ds.assign_coords(year=('time', ds.time.dt.year.values)) .swap_dims({'time': 'year'})).drop_vars(['time']) assert 'year' in ds.dims, 'Dataset needs to have `year` as dimension' ds_monthly = ds.expand_dims(month=np.arange(1, 13)) ds_stacked = ds_monthly.stack(year_month=('year', 'month')) _datetime = pd.to_datetime([f"{y}-{m}" for y in ds_monthly.year.values for m in np.arange(1, 13)]) ds_stacked = ds_stacked.assign_coords(time=('year_month', _datetime)) ds_stacked = ds_stacked.swap_dims({'year_month': 'time'}) ds_stacked = ds_stacked.drop_vars(['year', 'month', 'year_month']) return ds_stacked