Building a Simple TEEHR Dataset#

In this notebook you will learn how to build a simple TEEHR dataset, export it to a joined parquet file and run a few simple queries against it. This example is intentionally very simple and by no means shows all the functionality of the TEEHR toolsets or approach.

All of the input data is CSV and GeoJSON files. This is intended to be the simplest example of how TEEHR can be used.

# Import the required packages
import pandas as pd
import geopandas as gpd
import duckdb
import datetime as datetime

from pathlib import Path
from teehr.classes.duckdb_database import DuckDBDatabase
from teehr.classes.duckdb_joined_parquet import DuckDBJoinedParquet
import teehr.queries.duckdb as tqd

import holoviews as hv
import geoviews as gv
import hvplot.pandas
import cartopy.crs as ccrs
from holoviews import opts
# Download example data that we will converted to TEEHR from S3.
!rm -rf ~/teehr/example-1
!aws s3 cp --recursive --no-sign-request s3://ciroh-rti-public-data/teehr-workshop-devcon-2024/workshop-data/example-1 ~/teehr/example-1
# Define the raw data and TEEHR 'dataset' directory locations
RAW_DATA_FILEPATH = Path(Path().home(), "teehr/example-1/raw")
TEEHR_BASE = Path(Path.home(), "teehr/example-1/teehr_base")
# While TEEHR is very flexible with regards to where data is stored and how it is named, 
# we have a bit a a standard established. The following sets up the standard folder structure.
# Create folders for each type of TEEHR 'table'
PRIMARY_FILEPATH = Path(TEEHR_BASE, 'primary')
SECONDARY_FILEPATH = Path(TEEHR_BASE, 'secondary')
CROSSWALK_FILEPATH = Path(TEEHR_BASE, 'crosswalk')
GEOMETRY_FILEPATH = Path(TEEHR_BASE, 'geometry')
ATTRIBUTE_FILEPATH = Path(TEEHR_BASE, 'attribute')
JOINED_FILEPATH = Path(TEEHR_BASE, 'joined')
DB_FILEPATH = Path(TEEHR_BASE, 'teehr.db')

PRIMARY_FILEPATH.mkdir(exist_ok=True, parents=True)
SECONDARY_FILEPATH.mkdir(exist_ok=True, parents=True)
CROSSWALK_FILEPATH.mkdir(exist_ok=True, parents=True)
GEOMETRY_FILEPATH.mkdir(exist_ok=True, parents=True)
ATTRIBUTE_FILEPATH.mkdir(exist_ok=True, parents=True)
JOINED_FILEPATH.mkdir(exist_ok=True, parents=True)
# Look at folder/file structure.  Notice the raw data we downloaded as a starting point is 
# in 'raw', but the folders in 'teehr_base' are empty still.  We will populate them next.
!tree ~/teehr/example-1

Convert data to TEEHR format#

In this section we will convert the following data types from CSV or GeoJSON format to TEEHR format.

  • Geometry
  • Location attributes
  • Crosswalk tables
  • Primary timeseries
  • Secondary timeseries
# Read a GeoJSON file, rename a column to conform to the TEEHR format
# and save as Parquet in TEEHR format.
locations = gpd.read_file(Path(RAW_DATA_FILEPATH, "gages.geojson"))
locations.rename(columns={"station": "name"}, inplace=True)
locations.to_parquet(Path(GEOMETRY_FILEPATH, "locations.parquet"))
locations
# Convert crosswalks CSV files for the BASELINE SIMULATION data to Parquet in 
# TEEHR format.
baseline_xw = pd.read_csv(Path(RAW_DATA_FILEPATH, "baseline-crosswalk.csv"))
baseline_xw.to_parquet(Path(CROSSWALK_FILEPATH, "baseline-crosswalk.parquet"))
baseline_xw
# Convert crosswalks CSV files for the INNOVATION SIMULAION data to Parquet in 
# TEEHR format.
sim_xw = pd.read_csv(Path(RAW_DATA_FILEPATH, "sim-crosswalk.csv"))
sim_xw.to_parquet(Path(CROSSWALK_FILEPATH, "sim-crosswalk.parquet"))
sim_xw
# Convert 3 attribute CSV files to Parquet TEEHR format.
attr1 = pd.read_csv(Path(RAW_DATA_FILEPATH, "gage_attr_2yr_discharge.csv"))
attr1.to_parquet(Path(ATTRIBUTE_FILEPATH, "2yr_discharge.parquet"))
display(attr1)

attr2 = pd.read_csv(Path(RAW_DATA_FILEPATH, "gage_attr_drainage_area_km2.csv"))
attr2.to_parquet(Path(ATTRIBUTE_FILEPATH, "drainage_area.parquet"))
display(attr2)

attr3 = pd.read_csv(Path(RAW_DATA_FILEPATH, "gage_attr_ecoregion.csv"))
attr3.to_parquet(Path(ATTRIBUTE_FILEPATH, "ecoregion.parquet"))
display(attr3)
# Open the OBSERVED timeseries CSV files and review
# Note, these are called the PRIMARY TIMESERIES in TEEHR.
obs_ts = pd.read_csv(Path(RAW_DATA_FILEPATH, "obs.csv"))
obs_ts
# Add the other static columns required for TEEHR and save as Parquet file.
obs_ts['configuration'] = 'usgs'
obs_ts['variable_name'] = 'streamflow_daily_mean'
obs_ts['measurement_unit'] = 'cms'
obs_ts['reference_time'] = None

# Reference_time column must be cast as type datetime64[ns] if set to None
obs_ts['reference_time'] = obs_ts['reference_time'].astype('datetime64[ns]')
obs_ts.to_parquet(Path(PRIMARY_FILEPATH, "obs.parquet"))
obs_ts
# Open the BASELINE SIMULATION timeseries CSV files and review.
# This could represent the "current standard" simulation.
baseline_ts = pd.read_csv(Path(RAW_DATA_FILEPATH, "baseline.csv"))
baseline_ts
# Add the other static columns required for TEEHR and save as Parquet file.
baseline_ts['configuration'] = 'baseline'
baseline_ts['variable_name'] = 'streamflow_daily_mean'
baseline_ts['measurement_unit'] = 'cms'
baseline_ts['reference_time'] = None

# Reference_time column must be cast as type datetime64[ns] if set to None
baseline_ts['reference_time'] = (
    baseline_ts['reference_time'].astype('datetime64[ns]')
)
baseline_ts.to_parquet(Path(SECONDARY_FILEPATH, "baseline.parquet"))
baseline_ts
# Open the INNOVATION SIMULATION timeseries CSV files and review.
# This could represent an innovation that you want to compare to the baseline.
sim_ts = pd.read_csv(Path(RAW_DATA_FILEPATH, "sim.csv"))
sim_ts
# Add the other columns required for TEEHR
sim_ts['configuration'] = 'sim'
sim_ts['variable_name'] = 'streamflow_daily_mean'
sim_ts['measurement_unit'] = 'cms'
sim_ts['reference_time'] = None

# Reference_time column must be cast as type datetime64[ns] if set to None
sim_ts['reference_time'] = (
    baseline_ts['reference_time'].astype('datetime64[ns]')
)
sim_ts.to_parquet(Path(SECONDARY_FILEPATH, "sim.parquet"))
sim_ts
# List the contents of the example-1 directory.
# Notice now there are files in the 'teehr_base' directory, but the 'joined' directory is empty.
!tree ~/teehr/example-1

Create the joined timeseries table#

In this section we will join the primary and secondary timeseries, add attributes and add a user defined field, before exporting to a joined Parquet file.

# Setup paths to provide to the TEEHR queries that will allow reading 
# Parquet files in the 'table' folder or any subfolders.
PRIMARY_FOLDER = f"{PRIMARY_FILEPATH}/**/*.parquet"
SECONDARY_FOLDER = f"{SECONDARY_FILEPATH}/**/*.parquet"
GEOMETRY_FOLDER = f"{GEOMETRY_FILEPATH}/**/*.parquet"
CROSSWALK_FOLDER = f"{CROSSWALK_FILEPATH}/**/*.parquet"
ATTRIBUTE_FOLDER = f"{ATTRIBUTE_FILEPATH}/**/*.parquet"
JOINED_FOLDER = f"{JOINED_FILEPATH}/**/*.parquet"
# If there is an existing database, delete it and create a new one.
if DB_FILEPATH.is_file():
    DB_FILEPATH.unlink()

ddb = DuckDBDatabase(DB_FILEPATH)
# Join and insert the timeseries data to the temporary database.
ddb.insert_joined_timeseries(
    primary_filepath=PRIMARY_FOLDER,
    secondary_filepath=SECONDARY_FOLDER,
    crosswalk_filepath=CROSSWALK_FOLDER,
    drop_added_fields=True,
)
# Join and insert the attributes data to the temporary database.
ddb.insert_attributes(ATTRIBUTE_FOLDER)
# Add month as a calculated field

# Function arguments should have the same data type as the fields used. 
# Note: In the data model, all attribute values are added to the db as type 'str' 
def add_month_field(arg1: datetime) -> int:
    return arg1.month
    
# month
ddb.insert_calculated_field(new_field_name="month",
                    new_field_type="INTEGER",
                    parameter_names=["value_time"],
                    user_defined_function=add_month_field)
# View fields now in the DB
ddb.get_joined_timeseries_schema()
# The database is temporary and disposable.  Lets export the joined data to Parquet files.
ddb.query(f"""
    COPY (
        SELECT *
        FROM joined_timeseries
        ORDER BY configuration, primary_location_id, value_time
    )
   TO '{JOINED_FILEPATH}/joined.parquet' (FORMAT PARQUET)
""")
# The temprary database is not needed any longer.  Delete it.
if DB_FILEPATH.is_file():
    DB_FILEPATH.unlink()
# List the contents of the example-1 directory.
# Notice now there are files in the 'joined' directory.
!tree ~/teehr/example-1

Query the joined Parquet file(s) and create a few simple plots.#

# Create a DuckDB joined Parquet class instance to interact with the joined Parquet files.
jpdb = DuckDBJoinedParquet(JOINED_FOLDER, GEOMETRY_FOLDER)
# Get the joined timeseries for 'gage-A'
# Note, the data being evaluated is simulated but not forecasts data.  As such,
# it does not have a reference_time.
joined = jpdb.get_joined_timeseries(
    filters=[
        {
            "column": "primary_location_id",
            "operator": "=",
            "value": "gage-A"
        }
    ],
    order_by=["primary_location_id", "configuration", "value_time"],
)
joined.head()
# Create a basic plot that shows observed, baseline and simulated timeseries based on the query above. 
baseline = joined[joined["configuration"] == "baseline"].copy()
sim  = joined[joined["configuration"] == "sim"].copy()

(
    baseline.hvplot(x="value_time", y=["primary_value"], label="Observed", legend=True) 
    * baseline.hvplot(x="value_time", y=["secondary_value"], label="Baseline", legend=True) 
    * sim.hvplot(x="value_time", y=["secondary_value"], label="Simulated", legend=True)
)
# Generate some simple metrics from the joined timseries table and include geometry in response.
metrics = jpdb.get_metrics(
    group_by=["primary_location_id", "configuration"],
    order_by=["primary_location_id", "configuration"],
    include_metrics="all",
    include_geometry=True
)
metrics
# Make a simple plot and color stations by relative_bias.
metrics_prj = metrics.to_crs("EPSG:3857")
tiles = hv.element.tiles.CartoLight() #gv.tile_sources.OSM
hvplot = metrics_prj.hvplot(
    c='relative_bias', 
    crs=ccrs.GOOGLE_MERCATOR,
    size=75,
    cmap='RdYlGn'
)

(tiles*hvplot).opts(width=500, height=500)

This concludes this simple indroductory example. For more in-depth examples that start to show the power and performance of the TEEHR platform please see example-2 and example-3.