Source code for teehr.metrics.signature_funcs

"""Signature functions."""
import pandas as pd
import numpy as np

from teehr.metrics.models.base import MetricsBasemodel
from teehr.metrics.models.base import TransformEnum
from typing import Callable, Optional
import logging
logger = logging.getLogger(__name__)

EPSILON = 1e-6  # Small constant to avoid division by zero


def _transform(
        p: pd.Series,
        model: MetricsBasemodel,
        t: Optional[pd.Series] = None
) -> tuple:
    """Apply timeseries transform for metrics calculations."""
    # Apply transform
    if model.transform is not None:
        match model.transform:
            case TransformEnum.log:
                if model.add_epsilon:
                    logger.debug("Adding epsilon to avoid log(0)")
                    p = p + EPSILON
                logger.debug("Applying log transform")
                p = np.log(p)
            case TransformEnum.sqrt:
                logger.debug("Applying square root transform")
                p = np.sqrt(p)
            case TransformEnum.square:
                logger.debug("Applying square transform")
                p = np.square(p)
            case TransformEnum.cube:
                logger.debug("Applying cube transform")
                p = np.power(p, 3)
            case TransformEnum.exp:
                logger.debug("Applying exponential transform")
                p = np.exp(p)
            case TransformEnum.inv:
                if model.add_epsilon:
                    logger.debug("Adding epsilon to avoid division by zero")
                    p = p + EPSILON
                logger.debug("Applying inverse transform")
                p = 1.0 / p
            case TransformEnum.abs:
                logger.debug("Applying absolute value transform")
                p = np.abs(p)
            case _:
                raise ValueError(
                    f"Unsupported transform: {model.transform}"
                )
    else:
        logger.debug("No transform specified, using original values")

    # Remove invalid values and align series
    if t is not None:
        if isinstance(t, pd.Series):
            valid_mask = np.isfinite(p)
            p = p[valid_mask]
            t = t[valid_mask]
        else:
            raise TypeError(
                f"Invalid type for t: {type(t)}. Expected pd.Series."
            )
    else:
        valid_mask = np.isfinite(p)
        p = p[valid_mask]

    if t is not None:
        return p, t
    else:
        return p


[docs] def max_value_time(model: MetricsBasemodel) -> Callable: """Create max_value_time metric function.""" logger.debug("Building the max_value_time metric function") def max_value_time_inner( p: pd.Series, value_time: pd.Series ) -> pd.Timestamp: """Max value time.""" p, value_time = _transform(p, model, value_time) return value_time[p.idxmax()] return max_value_time_inner
[docs] def variance(model: MetricsBasemodel) -> Callable: """Create variance metric function.""" logger.debug("Building the variance metric function") def variance_inner(p: pd.Series) -> float: """Variance.""" p = _transform(p, model) return np.var(p) return variance_inner
[docs] def count(model: MetricsBasemodel) -> Callable: """Create count metric function.""" logger.debug("Building the count metric function") def count_inner(p: pd.Series) -> float: """Count.""" p = _transform(p, model) return len(p) return count_inner
[docs] def minimum(model: MetricsBasemodel) -> Callable: """Create minimum metric function.""" logger.debug("Building the minimum metric function") def minimum_inner(p: pd.Series) -> float: """Minimum.""" p = _transform(p, model) return np.min(p) return minimum_inner
[docs] def maximum(model: MetricsBasemodel) -> Callable: """Create maximum metric function.""" logger.debug("Building the maximum metric function") def maximum_inner(p: pd.Series) -> float: """Maximum.""" p = _transform(p, model) return np.max(p) return maximum_inner
[docs] def average(model: MetricsBasemodel) -> Callable: """Create average metric function.""" logger.debug("Building the average metric function") def average_inner(p: pd.Series) -> float: """Average.""" p = _transform(p, model) return np.mean(p) return average_inner
[docs] def sum(model: MetricsBasemodel) -> Callable: """Create sum metric function.""" logger.debug("Building the sum metric function") def sum_inner(p: pd.Series) -> float: """Sum.""" p = _transform(p, model) return np.sum(p) return sum_inner
[docs] def flow_duration_curve_slope(model: MetricsBasemodel) -> Callable: """Create flow duration curve slope metric function.""" logger.debug("Building the flow duration curve slope metric function") def flow_duration_curve_slope_inner( p: pd.Series, ) -> float: """Flow duration curve slope.""" # ensure percentiles are within valid range if not (0 <= model.lower_quantile <= 1): raise ValueError( "Lower quantile must be between 0 and 1" ) if not (0 <= model.upper_quantile <= 1): raise ValueError( "Upper quantile must be between 0 and 1" ) # ensure lower quantile is less than upper quantile if model.lower_quantile >= model.upper_quantile: raise ValueError( "Lower quantile must be less than upper quantile" ) # apply any specified transform p = _transform(p, model) # sort the streamflow values in descending order p_sorted = p.sort_values(ascending=False).reset_index(drop=True) # calculate exceedance probabilities n = len(p_sorted) fdc_probs = (p_sorted.index/(n+1)) # determine indices for the specified quantiles lower_idx = np.argmin(np.abs(fdc_probs - model.lower_quantile)) upper_idx = np.argmin(np.abs(fdc_probs - model.upper_quantile)) # check for as_percentile flag if model.as_percentile: fdc_probs = fdc_probs * 100 # calculate slope between the two quantiles if model.add_epsilon: slope = (p_sorted.iloc[upper_idx] - p_sorted.iloc[lower_idx]) / (( fdc_probs[upper_idx] - fdc_probs[lower_idx] ) + EPSILON) else: slope = (p_sorted.iloc[upper_idx] - p_sorted.iloc[lower_idx]) / ( fdc_probs[upper_idx] - fdc_probs[lower_idx] ) return slope return flow_duration_curve_slope_inner
def _sort_inputs( p: pd.Series, value_time: pd.Series ) -> pd.Series: """Sorts value_time for center_of_timing_inner().""" # ensure ordinates are ordered by value_time temp_df = pd.DataFrame({'primary_value': p, 'value_time': value_time}) sorted_df = temp_df.sort_values(by='value_time').reset_index(drop=True) return sorted_df['primary_value'], sorted_df['value_time'] def _ensure_daily_timestep( p: pd.Series, value_time: pd.Series ) -> pd.Series: """Ensure inputs are in daily timestep for center_of_timing_inner().""" # determine unique, non-NaT timestep sizes unique_timesteps = value_time.diff().unique() unique_timesteps = [ts for ts in unique_timesteps if pd.notna(ts)] # return original series' if already in daily timestep if (len(unique_timesteps) == 1) & (unique_timesteps[0] == pd.Timedelta(days=1)): return p.reset_index(drop=True), value_time.reset_index(drop=True) else: # assemble dataframe and resample to daily df = pd.DataFrame({'primary_value': p.values}, index=value_time.values) # calculate mean for day if at least one sub-day value is available resampled_df = df.resample('D').apply(lambda x: x.mean() if x.notna().any() else np.nan) # remove NaNs to preserve original index resampled_df = resampled_df.dropna() # normalize indices, return values p_d = resampled_df['primary_value'].reset_index(drop=True) value_time_d = resampled_df.index.to_series().reset_index(drop=True) return p_d, value_time_d def _get_water_year(dates: pd.Series) -> pd.Series: """Get water year from dates (year + 1 if month >= October).""" return dates.apply(lambda x: x.year + 1 if x.month >= 10 else x.year) def _get_water_year_length(wy: int) -> int: """Get length of water year (365 or 366 based on leap year). Water year ends in September, so Feb is in year (wy). """ import calendar return 366 if calendar.isleap(wy) else 365 def _get_day_of_wy( dates: pd.Series ) -> pd.Series: """Obtain the numeric day-of-water-year for center_of_timing_inner().""" import datetime # obtain the water year for each date water_years = _get_water_year(dates) # get the start date of the corresponding water year for each date water_year_starts = pd.to_datetime([datetime.date(wy - 1, 10, 1) for wy in water_years]) # get the difference in days and add 1 (since day 1 is the start date) days_of_WY = (dates - water_year_starts).dt.days + 1 return days_of_WY
[docs] def center_of_timing(model: MetricsBasemodel) -> Callable: """Create center_of_timing metric function.""" logger.debug("Building the center_of_timing metric function") def center_of_timing_inner( p: pd.Series, value_time: pd.Series ) -> float: """Perform center_of_timing calculation. Computes CT for each water year separately (with leap-year handling), then returns the mean across all water years. Returns ------- float Mean CT across all water years in the input data. """ # apply transform if specified p, value_time = _transform(p, model, value_time) # ensure inputs are sorted by value_time p, value_time = _sort_inputs(p, value_time) # ensure input data is a daily timestep p_d, value_time_d = _ensure_daily_timestep(p, value_time) # Determine water years present water_years = _get_water_year(value_time_d) unique_wys = water_years.unique() # Group by water year and compute CT for each ct_values = [] for wy in unique_wys: wy_mask = water_years == wy p_wy = p_d[wy_mask] value_time_wy = value_time_d[wy_mask] # Get water year length for leap year handling year_len = _get_water_year_length(wy) # Check missing data threshold if len(value_time_wy) < (1 - model.missing_day_threshold) * year_len: continue # Obtain day of water year WY_days = _get_day_of_wy(value_time_wy) # Filter to non-zero flow steps non_zero = p_wy > 0 p_nz = p_wy[non_zero] WY_days_nz = WY_days[non_zero] # Calculate CT if len(p_nz) == 0: continue sum_p_nz = np.sum(p_nz) if sum_p_nz > 0: CT = np.sum(p_nz * WY_days_nz) / sum_p_nz ct_values.append(CT) # Return mean across all valid water years if len(ct_values) == 0: return None return np.mean(ct_values) return center_of_timing_inner
[docs] def standard_deviation_of_timing(model: MetricsBasemodel) -> Callable: """Create standard_deviation_of_timing metric function.""" logger.debug("Building the standard_deviation_of_timing metric function") def standard_deviation_of_timing_inner( p: pd.Series, value_time: pd.Series ) -> float: """Perform standard_deviation_of_timing calculation. Computes SDoT for each water year separately (with leap-year handling), then returns the mean across all water years. Returns ------- float Mean SDoT across all water years in the input data. """ # apply transform if specified p, value_time = _transform(p, model, value_time) # ensure inputs are sorted by value_time p, value_time = _sort_inputs(p, value_time) # ensure input data is a daily timestep p_d, value_time_d = _ensure_daily_timestep(p, value_time) # Determine water years present water_years = _get_water_year(value_time_d) unique_wys = water_years.unique() # Group by water year and compute SDoT for each sdot_values = [] for wy in unique_wys: wy_mask = water_years == wy p_wy = p_d[wy_mask] value_time_wy = value_time_d[wy_mask] # Get water year length for leap year handling year_len = _get_water_year_length(wy) # Check missing data threshold if len(value_time_wy) < (1 - model.missing_day_threshold) * year_len: continue # Obtain day of water year WY_days = _get_day_of_wy(value_time_wy) # Filter to non-zero flow steps non_zero = p_wy > 0 p_nz = p_wy[non_zero] WY_days_nz = WY_days[non_zero] # Calculate CT first (needed for SDoT) if len(p_nz) <= 1: continue sum_p_nz = np.sum(p_nz) if sum_p_nz == 0: continue CT = np.sum(p_nz * WY_days_nz) / sum_p_nz # Normalize CT to [1, year_len] range if not np.isnan(CT): CT = ((CT - 1) % year_len) + 1 # Get number of non-zero flow steps n_prime = len(p_nz) # Calculate SDoT SDoT_numerator = np.sum(p_nz * (WY_days_nz - CT)**2) SDoT_denominator = sum_p_nz * (n_prime - 1) if model.add_epsilon: SDoT = np.sqrt(SDoT_numerator / (SDoT_denominator + EPSILON)) else: if SDoT_denominator == 0: continue SDoT = np.sqrt(SDoT_numerator / SDoT_denominator) sdot_values.append(SDoT) # Return mean across all valid water years if len(sdot_values) == 0: return None return np.mean(sdot_values) return standard_deviation_of_timing_inner