RowLevelCalculatedFields#

class RowLevelCalculatedFields[source]#

Row level Calculated Fields.

Notes

Row level CFs are applied to each row in the table based on data that is in one or more existing fields. These are applied per row and are not aware of the data in any other row (e.g., are not aware of any other timeseries values in a “timeseries”). This can be used for adding fields such as a field based on the data/time (e.g., month, year, season, etc.) or based on the value field (e.g., normalized flow, log flow, etc.) and many other uses.

Available Calculated Fields:

  • Month

  • Year

  • WaterYear

  • NormalizedFlow

  • Seasons

  • ForecastLeadTime

  • ForecastLeadTimeBins

  • ThresholdValueExceeded

  • DayOfYear

  • HourOfYear

  • GenericSQL

Examples

Add row level calculated fields to the joined timeseries table and write to the warehouse.

>>> import teehr
>>> from teehr import RowLevelCalculatedFields as rcf
>>> ev.joined_timeseries.add_calculated_fields([
...    rcf.Month(),
...    rcf.Year(),
...    rcf.WaterYear(),
...    rcf.Seasons()
... ]).write()

We can also use these calculated fields in metrics calculations.

>>> ev.metrics(table_name="joined_timeseries").add_calculated_fields([
...     rcf.Month()
... ]).aggregate(
...     metrics=[fdc],
...     group_by=[flds.primary_location_id, "month"],
... ).order_by([flds.primary_location_id, "month"]).to_pandas()

Methods

class DayOfYear(*, input_field_name: str = 'value_time', output_field_name: str = 'day_of_year')#

Adds the day of the year from a timestamp column.

Properties#

  • input_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • output_field_name:

    The name of the column to store the day of the year. Default: “day_of_year”

Notes

  • February 29th in leap years is set to None.

  • All days after February 29th are adjusted to correspond to the same day of the year as in a non-leap year.

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class ForecastLeadTime(*, value_time_field_name: str = 'value_time', reference_time_field_name: str = 'reference_time', output_field_name: str = 'forecast_lead_time')#

Adds the forecast lead time from a timestamp column.

Properties#

  • value_time_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • reference_time_field_name:

    The name of the column containing the forecast time. Default: “reference_time”

  • output_field_name:

    The name of the column to store the forecast lead time. Default: “forecast_lead_time”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class ForecastLeadTimeBins(*, value_time_field_name: str = 'value_time', reference_time_field_name: str = 'reference_time', lead_time_field_name: str = 'forecast_lead_time', output_field_name: str = 'forecast_lead_time_bin', bin_size: Timedelta | timedelta | str | list | dict = Timedelta('5 days 00:00:00'))#

Adds ID for grouped forecast lead time bins.

Properties#

  • value_time_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • reference_time_field_name:

    The name of the column containing the forecast time. Default: “reference_time”

  • lead_time_field_name:

    The name of the column containing the forecast lead time. Default: “forecast_lead_time”

  • output_field_name:

    The name of the column to store the lead time bin ID. Default: “forecast_lead_time_bin”

  • bin_size:

    Defines how forecast lead times are binned. Accepts pd.Timedelta, datetime.timedelta, or timedelta strings (e.g., ‘6 hours’, ‘1 day’). Three input formats are supported:

    1. Single timedelta (uniform binning): Creates equal-width bins of the specified duration.

      Examples:

      pd.Timedelta(hours=6) timedelta(hours=6) ‘6 hours’ ‘6h’

      Output bin IDs:

      “PT0H_PT6H”, “PT6H_PT12H”, “PT12H_PT18H”, …

    2. List of dicts (variable binning with auto-generated IDs): Creates bins with custom ranges. Bin IDs are auto-generated as ISO 8601 duration ranges. Values can be pd.Timedelta, datetime.timedelta, or timedelta strings.

      Examples:
      [

      {‘start_inclusive’: pd.Timedelta(hours=0), ‘end_exclusive’: pd.Timedelta(hours=6)}, {‘start_inclusive’: ‘6 hours’, ‘end_exclusive’: ‘12 hours’}, {‘start_inclusive’: timedelta(hours=12), ‘end_exclusive’: ‘1 day’}, {‘start_inclusive’: ‘1 day’, ‘end_exclusive’: ‘2 days’},

      ]

      Output bin IDs:

      “PT0H_PT6H”, “PT6H_PT12H”, “PT12H_P1D”, “P1D_P2D”

    3. Dict of dicts (variable binning with custom IDs): Creates bins with custom ranges and user-defined bin identifiers. Values can be pd.Timedelta, datetime.timedelta, or timedelta strings.

      Examples:
      {
      ‘short_range’: {‘start_inclusive’: ‘0 hours’,

      ‘end_exclusive’: ‘6 hours’},

      ‘medium_range’: {‘start_inclusive’: pd.Timedelta(hours=6),

      ‘end_exclusive’: timedelta(days=1)},

      ‘long_range’: {‘start_inclusive’: ‘1 day’,

      ‘end_exclusive’: ‘3 days’},

      }

      Output bin IDs:

      “short_range”, “medium_range”, “long_range”

    Default: pd.Timedelta(days=5)

Notes

  • Timedelta values can be specified as: - pd.Timedelta objects (e.g., pd.Timedelta(hours=6)) - datetime.timedelta objects (e.g., timedelta(hours=6)) - Strings (e.g., ‘6 hours’, ‘1 day’, ‘1d 12h’, ‘PT6H’)

  • All timedelta inputs are internally converted to pd.Timedelta for processing.

  • Bin ranges are [start_inclusive, end_exclusive), except for the final bin which is inclusive of all remaining lead times.

  • If the maximum lead time in the data exceeds the last user-defined bin, an overflow bin is automatically created: - For auto-generated IDs: Uses ISO 8601 duration format - For custom IDs: Appends “overflow” as the bin ID

  • Bin IDs use ISO 8601 duration format (e.g., “PT6H” for 6 hours, “P1DT12H” for 1 day and 12 hours) for auto-generated bins.

  • Custom bin IDs can use any string format.

Examples

Uniform 6-hour bins using different input types:

# Using pd.Timedelta
fcst_bins = ForecastLeadTimeBins(bin_size=pd.Timedelta(hours=6))

# Using datetime.timedelta
from datetime import timedelta
fcst_bins = ForecastLeadTimeBins(bin_size=timedelta(hours=6))

# Using string
fcst_bins = ForecastLeadTimeBins(bin_size='6 hours')

# All create bins: PT0H_PT6H, PT6H_PT12H, PT12H_PT18H, ...

Variable bins with auto-generated IDs using mixed types:

fcst_bins = ForecastLeadTimeBins(
    bin_size=[
        {'start_inclusive': '0 hours',
        'end_exclusive': '6 hours'},
        {'start_inclusive': pd.Timedelta(hours=6),
        'end_exclusive': '1 day'},
        {'start_inclusive': timedelta(days=1),
        'end_exclusive': '3 days'},
    ]
)
# Creates bins: PT0H_PT6H, PT6H_P1D, P1D_P3D

Variable bins with custom IDs using strings:

fcst_bins = ForecastLeadTimeBins(
    bin_size={
        'nowcast': {'start_inclusive': '0 hours',
        'end_exclusive': '6 hours'},
        'short_term': {'start_inclusive': '6 hours',
        'end_exclusive': '1 day'},
        'medium_term': {'start_inclusive': '1 day',
        'end_exclusive': '5 days'},
    }
)
# Creates bins: nowcast, short_term, medium_term
apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class GenericSQL(*, output_field_name: str, sql_statement: str)#

Adds a column computed from a SQL expression.

Properties#

  • output_field_name:

    The name of the column to add.

  • sql_statement:

    A SQL expression string that will be evaluated using Spark’s expr() function. The expression may reference any column that exists in the current DataFrame.

Examples

from teehr import RowLevelCalculatedFields as rcf

ev.joined_timeseries.add_calculated_fields([
    rcf.GenericSQL(
        output_field_name="log_primary_value",
        sql_statement="log(primary_value)"
    )
]).write()
apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class HourOfYear(*, input_field_name: str = 'value_time', output_field_name: str = 'hour_of_year')#

Adds the hour from a timestamp column.

Properties#

  • input_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • output_field_name:

    The name of the column to store the month. Default: “hour_of_year”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class Month(*, input_field_name: str = 'value_time', output_field_name: str = 'month')#

Adds the month from a timestamp column.

Properties#

  • input_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • output_field_name:

    The name of the column to store the month. Default: “month”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class NormalizedFlow(*, primary_value_field_name: str = 'primary_value', drainage_area_field_name: str = 'drainage_area', output_field_name: str = 'normalized_flow')#

Normalize flow values by drainage area.

Properties#

  • primary_value_field_name:

    The name of the column containing the flow values. Default: “primary_value”

  • drainage_area_field_name:

    The name of the column containing the drainage area. Default: “drainage_area”

  • output_field_name:

    The name of the column to store the normalized flow values. Default: “normalized_flow”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class Seasons(*, value_time_field_name: str = 'value_time', season_months: dict = {'fall': [9, 10, 11], 'spring': [3, 4, 5], 'summer': [6, 7, 8], 'winter': [12, 1, 2]}, output_field_name: str = 'season')#

Adds the season from a timestamp column.

Properties#

  • value_time_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • season_months:

    A dictionary mapping season names to the months that define them.

    Default: {
        "winter": [12, 1, 2],
        "spring": [3, 4, 5],
        "summer": [6, 7, 8],
        "fall": [9, 10, 11]
    }
    
  • output_field_name:

    The name of the column to store the season. Default: “season”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class ThresholdValueExceeded(*, input_field_name: str = 'primary_value', threshold_field_name: str = 'secondary_value', output_field_name: str = 'threshold_value_exceeded')#

Adds boolean column indicating if the input value exceeds a threshold.

Properties#

  • input_field_name:

    The name of the column containing the primary value. Default: “primary_value”

  • threshold_field_name:

    The name of the column containing the threshold value. Default: “secondary_value”

  • output_field_name:

    The name of the column to store the boolean value. Default: “threshold_value_exceeded”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class ThresholdValueNotExceeded(*, input_field_name: str = 'primary_value', threshold_field_name: str = 'secondary_value', output_field_name: str = 'threshold_value_not_exceeded')#

Adds boolean column indicating if the input value is less than or equal to a threshold.

Properties#

  • input_field_name:

    The name of the column containing the primary value. Default: “primary_value”

  • threshold_field_name:

    The name of the column containing the threshold value. Default: “secondary_value”

  • output_field_name:

    The name of the column to store the boolean value. Default: “threshold_value_not_exceeded”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class WaterYear(*, input_field_name: str = 'value_time', output_field_name: str = 'water_year')#

Adds the water year from a timestamp column.

Properties#

  • input_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • output_field_name:

    The name of the column to store the water year. Default: “water_year”

Water year is defined as the year of the date plus one if the month is October or later.

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.

class Year(*, input_field_name: str = 'value_time', output_field_name: str = 'year')#

Adds the year from a timestamp column.

Properties#

  • input_field_name:

    The name of the column containing the timestamp. Default: “value_time”

  • output_field_name:

    The name of the column to store the year. Default: “year”

apply_to(sdf: DataFrame) DataFrame#

Apply the calculated field to the Spark DataFrame.