Source code for teehr.models.calculated_fields.timeseries_aware

"""Classes representing UDFs."""
from typing import List, Union
from pydantic import Field
import pandas as pd
import numpy as np
import pyspark.sql.types as T
import pyspark.sql as ps
from teehr.models.calculated_fields.base import CalculatedFieldABC, CalculatedFieldBaseModel

UNIQUENESS_FIELDS = [
            'reference_time',
            'primary_location_id',
            'configuration_name',
            'variable_name',
            'unit_name'
        ]


class PercentileEventDetection(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Adds an "event" and "event_id" column to the DataFrame based on a percentile threshold.

    The "event" column (bool) indicates whether the value is above the XXth percentile.
    The "event_id" column (string) groups continuous segments of events and assigns a
    unique ID to each segment in the format "startdate-enddate".

    Properties
    ----------
    - quantile:
        The percentile threshold to use for event detection.
        Default: 0.85
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to detect events on.
        Default: "primary_value"
    - output_event_field_name:
        The name of the column to store the event detection.
        Default: "event"
    - output_event_id_field_name:
        The name of the column to store the event ID.
        Default: "event_id"
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """
    quantile: float = Field(
        default=0.85
    )
    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_event_field_name: str = Field(
        default="event"
    )
    output_event_id_field_name: str = Field(
        default="event_id"
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_is_event(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        quantile,
        group_by,
        return_type=T.BooleanType()

    ):
        # Get the schema of the input DataFrame
        input_schema = sdf.schema

        # Create a copy of the schema and add the new column
        output_schema = T.StructType(input_schema.fields + [T.StructField(output_field, return_type, True)])

        def is_event(pdf, input_field, quantile, output_field) -> pd.DataFrame:
            pvs = pdf[input_field]

            # Calculate the XXth percentile
            percentile = pvs.quantile(quantile)

            # Create a new column indicating whether each value is above the XXth percentile
            pdf[output_field] = pvs > percentile

            return pdf

        def wrapper(pdf, input_field, quantile, output_field):
            return is_event(pdf, input_field, quantile, output_field)

        # Group the data and apply the UDF
        # lambda pdf: wrapper_function(pdf, threshold_value)
        sdf = sdf.groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf, input_field, quantile, output_field),
            schema=output_schema
        )

        return sdf

    @staticmethod
    def add_event_ids(
        sdf,
        output_field,
        input_field,
        time_field,
        group_by,
        return_type=T.StringType(),

    ):
        # Get the schema of the input DataFrame
        input_schema = sdf.schema

        # Create a copy of the schema and add the new column
        output_schema = T.StructType(input_schema.fields + [T.StructField(output_field, return_type, True)])

        def event_ids(pdf: pd.DataFrame, input_field, time_field, output_field) -> pd.DataFrame:
            # Create a new column for continuous segments
            pdf['segment'] = (pdf[input_field] != pdf[input_field].shift()).cumsum()

            # Filter only the segments where values are over the 90th percentile
            segments = pdf[pdf[input_field]]

            # Group by segment and create startdate-enddate string
            segment_ranges = segments.groupby('segment').agg(
                startdate=(time_field, 'min'),
                enddate=(time_field, 'max')
            ).reset_index()

            # Merge the segment ranges back to the original DataFrame
            pdf = pdf.merge(segment_ranges[['segment', 'startdate', 'enddate']], on='segment', how='left')

            # Create the startdate-enddate string column
            pdf[output_field] = pdf.apply(
                lambda row: f"{row['startdate']}-{row['enddate']}" if pd.notnull(row['startdate']) else None,
                axis=1
            )

            # Drop the 'segment', 'startdate', and 'enddate' columns before returning
            pdf.drop(columns=['segment', 'startdate', 'enddate'], inplace=True)

            return pdf

        def wrapper(pdf, input_field, time_field, output_field):
            return event_ids(pdf, input_field, time_field, output_field)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(*group_by, time_field).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf, input_field, time_field, output_field),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_is_event(
            sdf=sdf,
            input_field=self.value_field_name,
            quantile=self.quantile,
            output_field=self.output_event_field_name,
            group_by=self.uniqueness_fields
        )
        sdf = self.add_event_ids(
            sdf=sdf,
            input_field=self.output_event_field_name,
            time_field=self.value_time_field_name,
            output_field=self.output_event_id_field_name,
            group_by=self.uniqueness_fields
        )

        return sdf


class BaseflowPeriodDetection(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Determines baseflow dominated periods.

    This class identifies periods where baseflow is dominant in the streamflow
    timeseries by adding two columns. The 'baseflow_period' column (bool)
    indicates whether the baseflow portion of the streamflow timeseries exceeds
    the quickflow portion, and the 'baseflow_period_id' column (string) groups
    continuous segments of baseflow dominated periods and assigns a unique ID
    to each segment in the format "startdate-enddate". Users can define a
    custom 'event_threshold' value to adjust the sensitivity of baseflow
    detection by applying a multiplier to the quickflow portion of the
    streamflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to compare with baseflow.
        Default: "primary_value"
    - baseflow_field_name:
        The name of the column containing the baseflow values.
        Default: None
    - event_threshold:
        The threshold multiplier value to determine event periods. The
        multiplier is applied to the quickflow portion of the streamflow
        timeseries when determining if the streamflow timeseries is
        dominated by baseflow.
        Default: 1.0
    - output_baseflow_period_field_name:
        The name of the column to store the baseflow period information.
        Default: "baseflow_period"
    - output_baseflow_period_id_field_name:
        The name of the column to store the baseflow period ID information.
        Default: "baseflow_period_id"
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    baseflow_field_name: str = Field(
        default=None
    )
    event_threshold: float = Field(
        default=1.0
    )
    output_baseflow_period_field_name: str = Field(
        default="baseflow_period"
    )
    output_baseflow_period_id_field_name: str = Field(
        default="baseflow_period_id"
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_is_baseflow_period(
        sdf: ps.DataFrame,
        input_field,
        baseflow_field,
        event_threshold,
        output_field,
        group_by,
        return_type=T.BooleanType()
    ):
        # Check for baseflow_field_name
        if baseflow_field is None:
            raise ValueError("baseflow_field_name must be specified.")

        # Get the schema of the input DataFrame
        input_schema = sdf.schema

        # Create a copy of the schema and add the new column
        output_schema = T.StructType(input_schema.fields + [T.StructField(output_field, return_type, True)])

        def is_baseflow_period(
                 pdf: pd.DataFrame,
                 input_field,
                 baseflow_field,
                 event_threshold,
                 output_field
        ) -> pd.DataFrame:
            # isolate timeseries
            streamflows = pdf[input_field]
            baseflows = pdf[baseflow_field]
            quickflows = streamflows - baseflows

            # apply event_threshold
            quickflows_adj = quickflows * event_threshold

            # create boolean and add to dataframe
            pdf[output_field] = baseflows > quickflows_adj

            return pdf

        def wrapper(pdf,
                    input_field,
                    baseflow_field,
                    event_threshold,
                    output_field):
            return is_baseflow_period(
                pdf, input_field, baseflow_field, event_threshold, output_field
            )

        sdf = sdf.groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                baseflow_field,
                                event_threshold,
                                output_field),
            schema=output_schema
        )

        return sdf

    @staticmethod
    def add_baseflow_period_ids(
        sdf: ps.DataFrame,
        input_field,
        time_field,
        output_field,
        group_by,
        return_type=T.StringType()
    ):
        # Get the schema of the input DataFrame
        input_schema = sdf.schema

        # Create a copy of the schema and add the new column
        output_schema = T.StructType(input_schema.fields + [T.StructField(output_field, return_type, True)])

        def baseflow_period_id(pdf: pd.DataFrame,
                               input_field,
                               time_field,
                               output_field
                               ) -> pd.DataFrame:
            # Create a new column for continuous segments
            pdf['segment'] = (pdf[input_field] != pdf[input_field].shift()).cumsum()

            # Filter only the segments where baseflow exceeds streamflow
            segments = pdf[pdf[input_field]]

            # Group by segment and create startdate-enddate string
            segment_ranges = segments.groupby('segment').agg(
                startdate=(time_field, 'min'),
                enddate=(time_field, 'max')
            ).reset_index()

            # Merge the segment ranges back to the original DataFrame
            pdf = pdf.merge(segment_ranges[['segment', 'startdate', 'enddate']], on='segment', how='left')

            # Create the startdate-enddate string column
            pdf[output_field] = pdf.apply(
                lambda row: f"{row['startdate']}-{row['enddate']}" if pd.notnull(row['startdate']) else None,
                axis=1
            )

            # Drop the 'segment', 'startdate', and 'enddate' columns
            pdf.drop(columns=['segment', 'startdate', 'enddate'], inplace=True)

            return pdf

        def wrapper(pdf, input_field, time_field, output_field):
            return baseflow_period_id(pdf,
                                      input_field,
                                      time_field,
                                      output_field)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_is_baseflow_period(
            sdf=sdf,
            input_field=self.value_field_name,
            baseflow_field=self.baseflow_field_name,
            event_threshold=self.event_threshold,
            output_field=self.output_baseflow_period_field_name,
            group_by=self.uniqueness_fields
        )
        sdf = self.add_baseflow_period_ids(
            sdf=sdf,
            input_field=self.output_baseflow_period_field_name,
            time_field=self.value_time_field_name,
            output_field=self.output_baseflow_period_id_field_name,
            group_by=self.uniqueness_fields
        )

        return sdf


class LyneHollickBaseflow(CalculatedFieldABC,
                          CalculatedFieldBaseModel):
    """Baseflow separation using the Lyne-Hollick method.

    This class implements the Lyne-Hollick digital filter method, which
    separates baseflow from quickflow using a timeseries of streamflow data.
    Adds a column to the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "lyne_hollick_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="lyne_hollick_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_lyne_hollick_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        group_by,
        return_type=T.DoubleType()
    ):
        # Get the schema of the input DataFrame
        input_schema = sdf.schema

        # Create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def lyne_hollick_baseflow(pdf: pd.DataFrame,
                                  input_field,
                                  time_field,
                                  output_field,
                                  beta) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.methods import LH

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Lyne-Hollick method
            date, flow = clean_streamflow(input_streamflow)
            result_df = pd.DataFrame(np.nan, index=date, columns=['LH'])
            result_df['LH'] = LH(Q=flow,
                                 beta=beta,
                                 return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['LH'].values

            return pdf

        def wrapper(pdf, input_field, time_field, output_field, beta):
            return lyne_hollick_baseflow(pdf,
                                         input_field,
                                         time_field,
                                         output_field,
                                         beta)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_lyne_hollick_baseflow(
            sdf=sdf,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            output_field=self.output_field_name,
            beta=self.beta,
            group_by=self.uniqueness_fields
        )

        return sdf


class ChapmanBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the Chapman method.

    This class implements the Chapman filter method, which separates baseflow
    from quickflow using a timeseries of streamflow data. Adds a column to
    the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "chapman_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - a: float
        The recession coefficient for the Chapman filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="chapman_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    a: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_chapman_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        a,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def chapman_baseflow(pdf: pd.DataFrame,
                             input_field,
                             time_field,
                             output_field,
                             beta,
                             a) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.comparision import strict_baseflow
            from baseflow.param_estimate import recession_coefficient
            from baseflow.methods import LH, Chapman

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Chapman method
            date, flow = clean_streamflow(input_streamflow)
            strict_filter = strict_baseflow(Q=flow,
                                            ice=None)
            if not a:
                a = recession_coefficient(Q=flow,
                                          strict=strict_filter)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            result_df = pd.DataFrame(np.nan, index=date, columns=['Chapman'])
            result_df['Chapman'] = Chapman(Q=flow,
                                           b_LH=b_LH,
                                           a=a,
                                           return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['Chapman'].values

            return pdf

        def wrapper(pdf, input_field, time_field, output_field, beta, a):
            return chapman_baseflow(pdf,
                                    input_field,
                                    time_field,
                                    output_field,
                                    beta,
                                    a)

        # group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                a),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_chapman_baseflow(
            sdf=sdf,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            output_field=self.output_field_name,
            beta=self.beta,
            a=self.a,
            group_by=self.uniqueness_fields
        )

        return sdf


class ChapmanMaxwellBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the Chapman-Maxwell method.

    This class implements the Chapman-Maxwell filter method, which separates
    baseflow from quickflow using a timeseries of streamflow data. Adds a
    column to the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "chapman_maxwell_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - a: float
        The recession coefficient for the Chapman-Maxwell filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="chapman_maxwell_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    a: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_chapman_maxwell_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        a,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def chapman_maxwell_baseflow(pdf: pd.DataFrame,
                                     input_field,
                                     time_field,
                                     output_field,
                                     beta,
                                     a) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.comparision import strict_baseflow
            from baseflow.param_estimate import recession_coefficient
            from baseflow.methods import LH, CM

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Chapman-Maxwell method
            date, flow = clean_streamflow(input_streamflow)
            strict_filter = strict_baseflow(Q=flow,
                                            ice=None)
            if not a:
                a = recession_coefficient(Q=flow,
                                          strict=strict_filter)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            result_df = pd.DataFrame(np.nan, index=date, columns=['CM'])
            result_df['CM'] = CM(Q=flow,
                                 b_LH=b_LH,
                                 a=a,
                                 return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['CM'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf, input_field, time_field, output_field, beta, a):
            return chapman_maxwell_baseflow(pdf,
                                            input_field,
                                            time_field,
                                            output_field,
                                            beta,
                                            a)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                a),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_chapman_maxwell_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            a=self.a,
            group_by=self.uniqueness_fields
        )

        return sdf


class BoughtonBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the Boughton method.

    This class implements the Boughton double-parameter filter method,
    which separates baseflow from quickflow using a timeseries of streamflow
    data. Adds a column to the joined timeseries table with the baseflow
    timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "boughton_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - a: float
        The recession coefficient for the Boughton filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - c: float
        The shape parameter for the Boughton filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="boughton_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    a: float = Field(
        default=None
    )
    c: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_boughton_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        a,
        c,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def boughton_baseflow(pdf: pd.DataFrame,
                              input_field,
                              time_field,
                              output_field,
                              beta,
                              a,
                              c) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.comparision import strict_baseflow
            from baseflow.param_estimate import recession_coefficient
            from baseflow.param_estimate import param_calibrate
            from baseflow.methods import LH, Boughton

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Boughton method
            date, flow = clean_streamflow(input_streamflow)
            strict_filter = strict_baseflow(Q=flow,
                                            ice=None)
            if not a:
                a = recession_coefficient(Q=flow,
                                          strict=strict_filter)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            if not c:
                param_range = np.arange(0.0001, 0.1, 0.0001)
                c = param_calibrate(param_range=param_range,
                                    method=Boughton,
                                    Q=flow,
                                    b_LH=b_LH,
                                    a=a)
            result_df = pd.DataFrame(np.nan, index=date, columns=['Boughton'])
            result_df['Boughton'] = Boughton(Q=flow,
                                             b_LH=b_LH,
                                             a=a,
                                             C=c,
                                             return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['Boughton'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf, input_field, time_field, output_field, beta, a, c):
            return boughton_baseflow(pdf,
                                     input_field,
                                     time_field,
                                     output_field,
                                     beta,
                                     a,
                                     c)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                a,
                                c),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_boughton_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            a=self.a,
            c=self.c,
            group_by=self.uniqueness_fields
        )

        return sdf


class FureyBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the Furey method.

    This class implements the Furey digital filter method, which separates
    baseflow from quickflow using a timeseries of streamflow data. Adds a
    column to the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "furey_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - a: float
        The recession coefficient for the Furey filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - c: float
        The shape parameter for the Furey filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="furey_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    a: float = Field(
        default=None
    )
    c: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_furey_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        a,
        c,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def furey_baseflow(pdf: pd.DataFrame,
                           input_field,
                           time_field,
                           output_field,
                           beta,
                           a,
                           c) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.comparision import strict_baseflow
            from baseflow.param_estimate import recession_coefficient
            from baseflow.param_estimate import param_calibrate
            from baseflow.methods import LH, Furey

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Furey method
            date, flow = clean_streamflow(input_streamflow)
            strict_filter = strict_baseflow(Q=flow,
                                            ice=None)
            if not a:
                a = recession_coefficient(Q=flow,
                                          strict=strict_filter)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            if not c:
                param_range = np.arange(0.01, 10, 0.01)
                c = param_calibrate(param_range=param_range,
                                    method=Furey,
                                    Q=flow,
                                    b_LH=b_LH,
                                    a=a)
            result_df = pd.DataFrame(np.nan, index=date, columns=['Furey'])
            result_df['Furey'] = Furey(Q=flow,
                                       b_LH=b_LH,
                                       a=a,
                                       A=c,
                                       return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['Furey'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf, input_field, time_field, output_field, beta, a, c):
            return furey_baseflow(pdf,
                                  input_field,
                                  time_field,
                                  output_field,
                                  beta,
                                  a,
                                  c)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                a,
                                c),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_furey_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            a=self.a,
            c=self.c,
            group_by=self.uniqueness_fields
        )

        return sdf


class EckhardtBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the Eckhardt method.

    This class implements the Eckhardt filter method, which separates baseflow
    from quickflow using a timeseries of streamflow data. Adds a column to
    the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "eckhardt_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - a: float
        The recession coefficient for the Eckhardt filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - BFImax: float
        The maximum baseflow index for the Eckhardt filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="eckhardt_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    a: float = Field(
        default=None
    )
    BFImax: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_eckhardt_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        a,
        BFImax,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def eckhardt_baseflow(pdf: pd.DataFrame,
                              input_field,
                              time_field,
                              output_field,
                              beta,
                              a,
                              BFImax) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.comparision import strict_baseflow
            from baseflow.param_estimate import recession_coefficient
            from baseflow.param_estimate import param_calibrate
            from baseflow.methods import LH, Eckhardt

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Eckhardt method
            date, flow = clean_streamflow(input_streamflow)
            strict_filter = strict_baseflow(Q=flow,
                                            ice=None)
            if not a:
                a = recession_coefficient(Q=flow,
                                          strict=strict_filter)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            if not BFImax:
                param_range = np.arange(0.001, 1, 0.001)
                BFImax = param_calibrate(param_range=param_range,
                                         method=Eckhardt,
                                         Q=flow,
                                         b_LH=b_LH,
                                         a=a)
            result_df = pd.DataFrame(np.nan, index=date, columns=['Eckhardt'])
            result_df['Eckhardt'] = Eckhardt(Q=flow,
                                             b_LH=b_LH,
                                             a=a,
                                             BFImax=BFImax,
                                             return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['Eckhardt'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf,
                    input_field,
                    time_field,
                    output_field,
                    beta,
                    a,
                    BFImax):
            return eckhardt_baseflow(pdf,
                                     input_field,
                                     time_field,
                                     output_field,
                                     beta,
                                     a,
                                     BFImax)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                a,
                                BFImax),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_eckhardt_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            a=self.a,
            BFImax=self.BFImax,
            group_by=self.uniqueness_fields
        )

        return sdf


class EWMABaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the EWMA method.

    This class implements the exponential moving average (EWMA) filter method,
    which separates baseflow from quickflow using a timeseries of streamflow
    data. Adds a column to the joined timeseries table with the baseflow
    timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "ewma_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - e: float
        The smoothing parameter for the EWMA filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="ewma_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    e: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_ewma_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        e,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def ewma_baseflow(pdf: pd.DataFrame,
                          input_field,
                          time_field,
                          output_field,
                          beta,
                          e) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.param_estimate import param_calibrate
            from baseflow.methods import LH, EWMA

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the EWMA method
            date, flow = clean_streamflow(input_streamflow)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            if not e:
                param_range = np.arange(0.0001, 0.1, 0.0001)
                e = param_calibrate(param_range=param_range,
                                    method=EWMA,
                                    Q=flow,
                                    b_LH=b_LH,
                                    a=None)
            result_df = pd.DataFrame(np.nan, index=date, columns=['EWMA'])
            result_df['EWMA'] = EWMA(Q=flow,
                                     b_LH=b_LH,
                                     a=None,
                                     e=e,
                                     return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['EWMA'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf, input_field, time_field, output_field, beta, e):
            return ewma_baseflow(pdf,
                                 input_field,
                                 time_field,
                                 output_field,
                                 beta,
                                 e)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                e),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_ewma_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            e=self.e,
            group_by=self.uniqueness_fields
        )

        return sdf


class WillemsBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the Willems method.

    This class implements the Willems digital filter method, which separates
    baseflow from quickflow using a timeseries of streamflow data. Adds a
    column to the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "willems_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - a: float
        The recession coefficient for the Willems filter method. If not
        provided, it will be estimated using the input timeseries data.
        Default: None
    - w: float
        The case-specific average quickflow proportion of the streamflow
        used in the Willems filter method. If not provided, it will be
        estimated using the input timeseries data.
        Default: None
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="willems_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    a: float = Field(
        default=None
    )
    w: float = Field(
        default=None
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_willems_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        a,
        w,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def willems_baseflow(pdf: pd.DataFrame,
                             input_field,
                             time_field,
                             output_field,
                             beta,
                             a,
                             w) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.comparision import strict_baseflow
            from baseflow.param_estimate import recession_coefficient
            from baseflow.param_estimate import param_calibrate
            from baseflow.methods import LH, Willems

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the Willems method
            date, flow = clean_streamflow(input_streamflow)
            strict_filter = strict_baseflow(Q=flow,
                                            ice=None)
            if not a:
                a = recession_coefficient(Q=flow,
                                          strict=strict_filter)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            if not w:
                param_range = np.arange(0.001, 1, 0.001)
                w = param_calibrate(param_range=param_range,
                                    method=Willems,
                                    Q=flow,
                                    b_LH=b_LH,
                                    a=a)
            result_df = pd.DataFrame(np.nan, index=date, columns=['Willems'])
            result_df['Willems'] = Willems(Q=flow,
                                           b_LH=b_LH,
                                           a=a,
                                           w=w,
                                           return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['Willems'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf, input_field, time_field, output_field, beta, a, w):
            return willems_baseflow(pdf,
                                    input_field,
                                    time_field,
                                    output_field,
                                    beta,
                                    a,
                                    w)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field
            ).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta,
                                a,
                                w),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_willems_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            a=self.a,
            w=self.w,
            group_by=self.uniqueness_fields
        )

        return sdf


class UKIHBaseflow(CalculatedFieldABC, CalculatedFieldBaseModel):
    """Baseflow separation using the UKIH method.

    This class implements the United Kingdom Institute of Hydrology (UKIH)
    filter method (also referred to as the smoothed minima method), which
    separates baseflow from quickflow using a timeseries of streamflow data.
    Adds a column to the joined timeseries table with the baseflow timeseries.

    Properties
    ----------
    - value_time_field_name:
        The name of the column containing the timestamp.
        Default: "value_time"
    - value_field_name:
        The name of the column containing the value to separate baseflow from.
        Default: "primary_value"
    - output_field_name:
        The name of the column to store the baseflow separation result.
        Default: "ukih_baseflow"
    - beta:
        The filter parameter for the Lyne-Hollick filter method.
        Default: 0.925
    - uniqueness_fields:
        The columns to use to uniquely identify each timeseries.

        .. code-block:: python

            Default: [
                'reference_time',
                'primary_location_id',
                'configuration_name',
                'variable_name',
                'unit_name'
            ]
    """

    value_time_field_name: str = Field(
        default="value_time"
    )
    value_field_name: str = Field(
        default="primary_value"
    )
    output_field_name: str = Field(
        default="ukih_baseflow"
    )
    beta: float = Field(
        default=0.925
    )
    uniqueness_fields: Union[str, List[str]] = Field(
        default=None
    )

    @staticmethod
    def add_ukih_baseflow(
        sdf: ps.DataFrame,
        output_field,
        input_field,
        time_field,
        beta,
        group_by,
        return_type=T.DoubleType()
    ):
        # get the schema of the input DataFrame
        input_schema = sdf.schema

        # create a copy of the schema and add the new column
        output_schema = T.StructType(
            input_schema.fields + [T.StructField(output_field,
                                                 return_type,
                                                 True)]
        )

        def ukih_baseflow(pdf: pd.DataFrame,
                          input_field,
                          time_field,
                          output_field,
                          beta) -> pd.DataFrame:
            # lazy load the BYU-baseflow library
            from baseflow.utils import clean_streamflow
            from baseflow.methods import LH, UKIH

            # create a new column for baseflow
            pdf[output_field] = None

            # obtain the input streamflow series
            input_streamflow = pd.Series(
                pdf[input_field].values,
                index=pd.to_datetime(pdf[time_field])
            )

            # ensure data has >= 120 timesteps
            if len(input_streamflow) < 120:
                raise ValueError(
                    "Input streamflow series must have at least 120 timesteps."
                    )

            # obtain the baseflow separation using the UKIH method
            date, flow = clean_streamflow(input_streamflow)
            b_LH = LH(Q=flow,
                      beta=beta,
                      return_exceed=False)
            result_df = pd.DataFrame(np.nan, index=date, columns=['UKIH'])
            result_df['UKIH'] = UKIH(Q=flow,
                                     b_LH=b_LH,
                                     return_exceed=False)

            # assign the baseflow values to the new column
            pdf[output_field] = result_df['UKIH'].values

            return pdf

        # Define the UDF for baseflow separation
        def wrapper(pdf, input_field, time_field, output_field, beta):
            return ukih_baseflow(pdf,
                                 input_field,
                                 time_field,
                                 output_field,
                                 beta)

        # Group the data and apply the UDF
        sdf = sdf.orderBy(
            *group_by,
            time_field).groupby(group_by).applyInPandas(
            lambda pdf: wrapper(pdf,
                                input_field,
                                time_field,
                                output_field,
                                beta),
            schema=output_schema
        )

        return sdf

    def apply_to(self, sdf: ps.DataFrame) -> ps.DataFrame:
        if self.uniqueness_fields is None:
            self.uniqueness_fields = UNIQUENESS_FIELDS
        sdf = self.add_ukih_baseflow(
            sdf,
            output_field=self.output_field_name,
            input_field=self.value_field_name,
            time_field=self.value_time_field_name,
            beta=self.beta,
            group_by=self.uniqueness_fields
        )

        return sdf


[docs] class TimeseriesAwareCalculatedFields(): """Timeseries aware calculated fields. Notes ----- Timeseries aware CFs are aware of ordered groups of data (e.g., a timeseries). This is useful for things such as event detection, base flow separation, and other fields that need to be calculated based on a entire timeseries. The definition of what creates a unique set of timeseries (i.e., a timeseries) can be specified. Available Calculated Fields: - PercentileEventDetection - BaseflowDominatedPeriods - LyneHollickBaseflow - ChapmanBaseflow - ChapmanMaxwellBaseflow - BoughtonBaseflow - FureyBaseflow - EckhardtBaseflow - EWMABaseflow - WillemsBaseflow - UKIHBaseflow """ PercentileEventDetection = PercentileEventDetection BaseflowPeriodDetection = BaseflowPeriodDetection LyneHollickBaseflow = LyneHollickBaseflow ChapmanBaseflow = ChapmanBaseflow ChapmanMaxwellBaseflow = ChapmanMaxwellBaseflow BoughtonBaseflow = BoughtonBaseflow FureyBaseflow = FureyBaseflow EckhardtBaseflow = EckhardtBaseflow EWMABaseflow = EWMABaseflow WillemsBaseflow = WillemsBaseflow UKIHBaseflow = UKIHBaseflow