TEEHR Evaluation Example 2 (Part 1)#

Daily Data, NWM 3.0 Retrospective and MARRMoT_37 HBV, CAMELS Subset (542)#

#

Example 2 walks through an anticipated common use case of evaluating experimental model output at a large sample of gauge locations using TEEHR. We are using MARRMoT model 37 (HBV) output as our ‘experimental’ model. (Thank you, Wouter - source: Knoben et al 2019) and a subset of CAMELS basins (543 of the 671).

In this notebook we will perform the following steps:#

  1. Get the TEEHR datasets from S3
  2. Build a joined duckdb database
  3. Add attributes to the joined database
  4. Add calculated fields to the joined database
  5. Export the joined duckdb database to a parquet file

1. Get the data from S3#

For the sake of time, we prepared the individual datasets in advance and are simply copying to your 2i2c home directory. After running the cell below to copy the example_2 data.

!rm -rf ~/teehr/example-2/*
!aws s3 cp --recursive --no-sign-request s3://ciroh-rti-public-data/teehr-workshop-devcon-2024/workshop-data/example-2 ~/teehr/example-2
# view the directory structure and contents
!tree ~/teehr/example-2/

2. Build the joined database#

An essential step for all model evalutions is joining the dataset being evaluated with a dataset that is considered ‘truth’ - i.e., the verifying data (typically some form of observations). What we mean by ‘joining’ is aligning a model output data (location and time) with the observations at the same (or most representative possible) location and time. If the evaluation includes a small number of locations and/or a small period of time (total amount of data is relatively small), the joining process could be done on the fly when calculating metrics. However as the magnitude of data becomes very large (e.g., 1000s of locations, 20+ years of hourly data), joining data on the fly and/or storing all of the joined data in memory can become prohibitively slow or infeasible, depending on your available memory. Further if including metric uncertainty bounds (to be added in future versions of TEEHR), the joined dataset must be resampled, creating yet more data to handle in memory. To address this, TEEHR creates a joined dataset in advance and writes the results to disk in highly efficient parquet format. For context, the 44-year NWM retrospective simulations at ~8000 gauge locations results in X billion rows of data. Joining this data on the fly would be not be feasible on most servers.

The TEEHR class DuckDBDatabase of module teehr.classes.duckdb_database provides methods to create a DuckDB database, insert joined time series, attributes (Step 3) and other calculated fields (Step 4) useful for evalution. We can then export to a parquet file so we can store it anywhere (S3) and efficiently test different metrics, grouping and filtering approaches for evaluation.

The arguments to build a joined database using insert_joined_timeseries on the DuckDBDatabase class are:

  • primary_filepath -> filepath of parquet file(s) containing the primary (observed) time series
  • secondary_filepath -> filepath of parquet file(s) containing the secondary (1 or more model/baseline) time series
  • crosswalk_filepath -> filepath of parquet file(s) containing crosswalks between primary and secondary location IDs
  • order_by -> list of fields to sort the data in the database
%%time
# Note, `%%time` measures the time for a cell to run. This cell takes about 15 seconds to run.

from teehr.classes.duckdb_database import DuckDBDatabase
from pathlib import Path

TEEHR_BASE = Path(Path.home(), "teehr/example-2")
PRIMARY_FILEPATH = f"{TEEHR_BASE}/primary/**/*.parquet"
SECONDARY_FILEPATH = f"{TEEHR_BASE}/secondary/**/*.parquet"
CROSSWALK_FILEPATH = f"{TEEHR_BASE}/crosswalks/**/*.parquet"
ATTRIBUTE_FILEPATH = f"{TEEHR_BASE}/attributes/**/*.parquet"

# define the joined parquet filepath and create parent directory
JOINED_PARQUET_FILEPATH = Path(TEEHR_BASE, "joined", "teehr_joined.parquet")
JOINED_PARQUET_FILEPATH.parent.mkdir(exist_ok=True, parents=True)

# temporary DuckDB database that will be exported to parquet
DB_FILEPATH = Path(JOINED_PARQUET_FILEPATH.parent, "teehr.db")

# if the database already exists, remove it first
if Path(DB_FILEPATH).is_file():
   Path(DB_FILEPATH).unlink()

# create the database and insert timeseries
ddd = DuckDBDatabase(f"{DB_FILEPATH}")
ddd.insert_joined_timeseries(
    primary_filepath=PRIMARY_FILEPATH,
    secondary_filepath=SECONDARY_FILEPATH,
    crosswalk_filepath=CROSSWALK_FILEPATH,
    order_by=[
        "primary_location_id",
        "configuration",
        "value_time"
    ],
)    

# confirm fields in the DB
ddd.get_joined_timeseries_schema()

4. Add attributes#

In this step, we will add those attributes to the joined database. Attributes add more “power” to the evaluation - i.e., more ways to group and filter the data for metric calculations, more complex metrics, and more insightful visualizations

The arguments to using insert_attributes on the DuckDBDatabase class are:

  • attribute_filepath -> filepath of parquet file(s) containing the attribute data by the primary location ID
%%time
# This take about 30 seconds to run

ddd.insert_attributes(attributes_filepath=ATTRIBUTE_FILEPATH)

# confirm fields in now the DB
ddd.get_joined_timeseries_schema()

5. Add calculated fields to the pre-joined duckdb database#

Calculated fields open up an even wider range of options for evaluating your data. It allows you to write simple functions to calculate new fields based on any existing fields (data or attributes), which can then be used to group or filter data for metric calculations.

In the examples below, two calculated field are added to the database:

  • the month -> to enable calculating metrics for specific months or ranges of months (season)
  • flow category relative to a threshold (above or below) -> to enable calcualting metrics for varying definitions of high or low flow
# Add calculated fields
import datetime as datetime

# Function arguments should have the same data type as the fields used. 
# Note: In the data model, all attribute values are added to the db as type 'str' 

def add_month_field(arg1: datetime) -> int:
    return arg1.month

def add_flow_category_relative_to_threshold_field(arg1: float, arg2: str) -> str:
    if arg1 >= float(arg2):
        return 'high'
    else:
        return 'low'

# month
ddd.insert_calculated_field(new_field_name="month",
                    new_field_type="INTEGER",
                    parameter_names=["value_time"],
                    user_defined_function=add_month_field)

# obs above mean
ddd.insert_calculated_field(new_field_name="obs_flow_category_q_mean",
                    new_field_type="VARCHAR",
                    parameter_names=["primary_value", "q_mean_cms"],
                    user_defined_function=add_flow_category_relative_to_threshold_field)

# view fields now in the DB
ddd.get_joined_timeseries_schema()

6. Export the pre-joined duckdb database to a parquet file#

import pandas as pd

# generate joined parquet file
ddd.query(f"""
    COPY (
        SELECT * 
        FROM joined_timeseries
        ORDER BY primary_location_id, value_time
    ) 
    TO '{JOINED_PARQUET_FILEPATH}' (FORMAT PARQUET);
""")
# read the resulting parquet file in pandas to check it out
df = pd.read_parquet(JOINED_PARQUET_FILEPATH)
df
# if you are confident the parquet file was successfully created, delete the temporarky DuckDB file
Path(DB_FILEPATH).unlink()