{ "cells": [ { "cell_type": "markdown", "id": "4b7fba4a-eed8-4d54-a077-a567602fce29", "metadata": {}, "source": [ "# TEEHR Evaluation Example 2 (Part 1)\n", "## Daily Data, NWM 3.0 Retrospective and MARRMoT_37 HBV, CAMELS Subset (542)\n", "#####\n", "\n", "Example 2 walks through an anticipated common use case of evaluating experimental model output at a large sample of gauge locations using TEEHR. We are using MARRMoT model 37 (HBV) output as our ‘experimental’ model. (Thank you, Wouter - source: Knoben et al 2019) and a subset of CAMELS basins (543 of the 671).\n", "\n", "#### In this notebook we will perform the following steps:\n", "
    \n", "
  1. Get the TEEHR datasets from S3
  2. \n", "
  3. Build a joined duckdb database
  4. \n", "
  5. Add attributes to the joined database
  6. \n", "
  7. Add calculated fields to the joined database
  8. \n", "
  9. Export the joined duckdb database to a parquet file
  10. \n", "
\n", "\n", "## 1. Get the data from S3\n", "For the sake of time, we prepared the individual datasets in advance and are simply copying to your 2i2c home directory. After running the cell below to copy the example_2 data." ] }, { "cell_type": "code", "execution_count": null, "id": "c0e83f08-95ad-49d1-8895-c41eff6a45dd", "metadata": {}, "outputs": [], "source": [ "!rm -rf ~/teehr/example-2/*\n", "!aws s3 cp --recursive --no-sign-request s3://ciroh-rti-public-data/teehr-workshop-devcon-2024/workshop-data/example-2 ~/teehr/example-2" ] }, { "cell_type": "code", "execution_count": null, "id": "7ad9e4ea-ebff-4824-89d5-9f76052903e3", "metadata": {}, "outputs": [], "source": [ "# view the directory structure and contents\n", "!tree ~/teehr/example-2/" ] }, { "cell_type": "markdown", "id": "1ba85820-c0c4-49ae-863b-9e9462adeef8", "metadata": {}, "source": [ "## 2. Build the joined database\n", "An essential step for all model evalutions is joining the dataset being evaluated with a dataset that is considered 'truth' - i.e., the verifying data (typically some form of observations). What we mean by 'joining' is aligning a model output data (location and time) with the observations at the same (or most representative possible) location and time. If the evaluation includes a small number of locations and/or a small period of time (total amount of data is relatively small), the joining process could be done on the fly when calculating metrics. However as the magnitude of data becomes very large (e.g., 1000s of locations, 20+ years of hourly data), joining data on the fly and/or storing all of the joined data in memory can become prohibitively slow or infeasible, depending on your available memory. Further if including metric uncertainty bounds (to be added in future versions of TEEHR), the joined dataset must be resampled, creating yet more data to handle in memory. To address this, TEEHR creates a joined dataset in advance and writes the results to disk in highly efficient parquet format. For context, the 44-year NWM retrospective simulations at ~8000 gauge locations results in X billion rows of data. Joining this data on the fly would be not be feasible on most servers.\n", "\n", "The TEEHR class ```DuckDBDatabase``` of module ```teehr.classes.duckdb_database``` provides methods to create a DuckDB database, insert joined time series, attributes (Step 3) and other calculated fields (Step 4) useful for evalution. We can then export to a parquet file so we can store it anywhere (S3) and efficiently test different metrics, grouping and filtering approaches for evaluation. \n", "\n", "The arguments to build a joined database using ```insert_joined_timeseries``` on the ```DuckDBDatabase``` class are:\n", "\n", "" ] }, { "cell_type": "code", "execution_count": null, "id": "2db85486-11d2-4b40-883d-e9ccdf921ebc", "metadata": {}, "outputs": [], "source": [ "%%time\n", "# Note, `%%time` measures the time for a cell to run. This cell takes about 15 seconds to run.\n", "\n", "from teehr.classes.duckdb_database import DuckDBDatabase\n", "from pathlib import Path\n", "\n", "TEEHR_BASE = Path(Path.home(), \"teehr/example-2\")\n", "PRIMARY_FILEPATH = f\"{TEEHR_BASE}/primary/**/*.parquet\"\n", "SECONDARY_FILEPATH = f\"{TEEHR_BASE}/secondary/**/*.parquet\"\n", "CROSSWALK_FILEPATH = f\"{TEEHR_BASE}/crosswalks/**/*.parquet\"\n", "ATTRIBUTE_FILEPATH = f\"{TEEHR_BASE}/attributes/**/*.parquet\"\n", "\n", "# define the joined parquet filepath and create parent directory\n", "JOINED_PARQUET_FILEPATH = Path(TEEHR_BASE, \"joined\", \"teehr_joined.parquet\")\n", "JOINED_PARQUET_FILEPATH.parent.mkdir(exist_ok=True, parents=True)\n", "\n", "# temporary DuckDB database that will be exported to parquet\n", "DB_FILEPATH = Path(JOINED_PARQUET_FILEPATH.parent, \"teehr.db\")\n", "\n", "# if the database already exists, remove it first\n", "if Path(DB_FILEPATH).is_file():\n", " Path(DB_FILEPATH).unlink()\n", "\n", "# create the database and insert timeseries\n", "ddd = DuckDBDatabase(f\"{DB_FILEPATH}\")\n", "ddd.insert_joined_timeseries(\n", " primary_filepath=PRIMARY_FILEPATH,\n", " secondary_filepath=SECONDARY_FILEPATH,\n", " crosswalk_filepath=CROSSWALK_FILEPATH,\n", " order_by=[\n", " \"primary_location_id\",\n", " \"configuration\",\n", " \"value_time\"\n", " ],\n", ") \n", "\n", "# confirm fields in the DB\n", "ddd.get_joined_timeseries_schema()" ] }, { "cell_type": "markdown", "id": "7c4a9ec9-3f0b-48e1-ab0b-df6e84a2ff6c", "metadata": {}, "source": [ "### 4. Add attributes\n", "\n", "In this step, we will add those attributes to the joined database. Attributes add more “power” to the evaluation - i.e., more ways to group and filter the data for metric calculations, more complex metrics, and more insightful visualizations\n", "\n", "The arguments to using ```insert_attributes``` on the ```DuckDBDatabase``` class are:\n", "\n", "" ] }, { "cell_type": "code", "execution_count": null, "id": "70355fd4-3d46-40b4-8bfa-8c4fc5c994d2", "metadata": {}, "outputs": [], "source": [ "%%time\n", "# This take about 30 seconds to run\n", "\n", "ddd.insert_attributes(attributes_filepath=ATTRIBUTE_FILEPATH)\n", "\n", "# confirm fields in now the DB\n", "ddd.get_joined_timeseries_schema()" ] }, { "cell_type": "markdown", "id": "90f4a4db-705e-40e9-a93a-395b3b285cc9", "metadata": {}, "source": [ "### 5. Add calculated fields to the pre-joined duckdb database\n", "\n", "Calculated fields open up an even wider range of options for evaluating your data. It allows you to write simple functions to calculate new fields based on any existing fields (data or attributes), which can then be used to group or filter data for metric calculations.\n", "\n", "In the examples below, two calculated field are added to the database:\n", "\n", "\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5c990763-6115-47e8-b532-3f16fa8e04bb", "metadata": {}, "outputs": [], "source": [ "# Add calculated fields\n", "import datetime as datetime\n", "\n", "# Function arguments should have the same data type as the fields used. \n", "# Note: In the data model, all attribute values are added to the db as type 'str' \n", "\n", "def add_month_field(arg1: datetime) -> int:\n", " return arg1.month\n", "\n", "def add_flow_category_relative_to_threshold_field(arg1: float, arg2: str) -> str:\n", " if arg1 >= float(arg2):\n", " return 'high'\n", " else:\n", " return 'low'\n", "\n", "# month\n", "ddd.insert_calculated_field(new_field_name=\"month\",\n", " new_field_type=\"INTEGER\",\n", " parameter_names=[\"value_time\"],\n", " user_defined_function=add_month_field)\n", "\n", "# obs above mean\n", "ddd.insert_calculated_field(new_field_name=\"obs_flow_category_q_mean\",\n", " new_field_type=\"VARCHAR\",\n", " parameter_names=[\"primary_value\", \"q_mean_cms\"],\n", " user_defined_function=add_flow_category_relative_to_threshold_field)\n", "\n", "# view fields now in the DB\n", "ddd.get_joined_timeseries_schema()" ] }, { "cell_type": "markdown", "id": "a338f678-7242-45ae-91f3-cee039ceac41", "metadata": {}, "source": [ "### 6. Export the pre-joined duckdb database to a parquet file" ] }, { "cell_type": "code", "execution_count": null, "id": "cecd418b-41be-431c-abb8-7af7a58fba0c", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "\n", "# generate joined parquet file\n", "ddd.query(f\"\"\"\n", " COPY (\n", " SELECT * \n", " FROM joined_timeseries\n", " ORDER BY primary_location_id, value_time\n", " ) \n", " TO '{JOINED_PARQUET_FILEPATH}' (FORMAT PARQUET);\n", "\"\"\")" ] }, { "cell_type": "code", "execution_count": null, "id": "2a12e877-3328-4ce8-9d7c-6033db6bddf9", "metadata": {}, "outputs": [], "source": [ "# read the resulting parquet file in pandas to check it out\n", "df = pd.read_parquet(JOINED_PARQUET_FILEPATH)\n", "df" ] }, { "cell_type": "code", "execution_count": null, "id": "917cd2b1-76fe-46cf-9d0b-50f3df9626ee", "metadata": {}, "outputs": [], "source": [ "# if you are confident the parquet file was successfully created, delete the temporarky DuckDB file\n", "Path(DB_FILEPATH).unlink()" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.8" } }, "nbformat": 4, "nbformat_minor": 5 }