{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Building a Simple TEEHR Dataset" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this notebook you will learn how to build a simple TEEHR dataset, export it to a joined parquet file and run a few simple queries against it. This example is intentionally very simple and by no means shows all the functionality of the TEEHR toolsets or approach.\n", "\n", "All of the input data is CSV and GeoJSON files. This is intended to be\n", "the simplest example of how TEEHR can be used." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import the required packages\n", "import pandas as pd\n", "import geopandas as gpd\n", "import duckdb\n", "import datetime as datetime\n", "\n", "from pathlib import Path\n", "from teehr.classes.duckdb_database import DuckDBDatabase\n", "from teehr.classes.duckdb_joined_parquet import DuckDBJoinedParquet\n", "import teehr.queries.duckdb as tqd\n", "\n", "import holoviews as hv\n", "import geoviews as gv\n", "import hvplot.pandas\n", "import cartopy.crs as ccrs\n", "from holoviews import opts" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "vscode": { "languageId": "shellscript" } }, "outputs": [], "source": [ "# Download example data that we will converted to TEEHR from S3.\n", "!rm -rf ~/teehr/example-1\n", "!aws s3 cp --recursive --no-sign-request s3://ciroh-rti-public-data/teehr-workshop-devcon-2024/workshop-data/example-1 ~/teehr/example-1" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Define the raw data and TEEHR 'dataset' directory locations\n", "RAW_DATA_FILEPATH = Path(Path().home(), \"teehr/example-1/raw\")\n", "TEEHR_BASE = Path(Path.home(), \"teehr/example-1/teehr_base\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# While TEEHR is very flexible with regards to where data is stored and how it is named, \n", "# we have a bit a a standard established. The following sets up the standard folder structure.\n", "# Create folders for each type of TEEHR 'table'\n", "PRIMARY_FILEPATH = Path(TEEHR_BASE, 'primary')\n", "SECONDARY_FILEPATH = Path(TEEHR_BASE, 'secondary')\n", "CROSSWALK_FILEPATH = Path(TEEHR_BASE, 'crosswalk')\n", "GEOMETRY_FILEPATH = Path(TEEHR_BASE, 'geometry')\n", "ATTRIBUTE_FILEPATH = Path(TEEHR_BASE, 'attribute')\n", "JOINED_FILEPATH = Path(TEEHR_BASE, 'joined')\n", "DB_FILEPATH = Path(TEEHR_BASE, 'teehr.db')\n", "\n", "PRIMARY_FILEPATH.mkdir(exist_ok=True, parents=True)\n", "SECONDARY_FILEPATH.mkdir(exist_ok=True, parents=True)\n", "CROSSWALK_FILEPATH.mkdir(exist_ok=True, parents=True)\n", "GEOMETRY_FILEPATH.mkdir(exist_ok=True, parents=True)\n", "ATTRIBUTE_FILEPATH.mkdir(exist_ok=True, parents=True)\n", "JOINED_FILEPATH.mkdir(exist_ok=True, parents=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Look at folder/file structure. Notice the raw data we downloaded as a starting point is \n", "# in 'raw', but the folders in 'teehr_base' are empty still. We will populate them next.\n", "!tree ~/teehr/example-1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Convert data to TEEHR format" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section we will convert the following data types from CSV or GeoJSON format to TEEHR format.\n", "\n", "" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Read a GeoJSON file, rename a column to conform to the TEEHR format\n", "# and save as Parquet in TEEHR format.\n", "locations = gpd.read_file(Path(RAW_DATA_FILEPATH, \"gages.geojson\"))\n", "locations.rename(columns={\"station\": \"name\"}, inplace=True)\n", "locations.to_parquet(Path(GEOMETRY_FILEPATH, \"locations.parquet\"))\n", "locations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Convert crosswalks CSV files for the BASELINE SIMULATION data to Parquet in \n", "# TEEHR format.\n", "baseline_xw = pd.read_csv(Path(RAW_DATA_FILEPATH, \"baseline-crosswalk.csv\"))\n", "baseline_xw.to_parquet(Path(CROSSWALK_FILEPATH, \"baseline-crosswalk.parquet\"))\n", "baseline_xw" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Convert crosswalks CSV files for the INNOVATION SIMULAION data to Parquet in \n", "# TEEHR format.\n", "sim_xw = pd.read_csv(Path(RAW_DATA_FILEPATH, \"sim-crosswalk.csv\"))\n", "sim_xw.to_parquet(Path(CROSSWALK_FILEPATH, \"sim-crosswalk.parquet\"))\n", "sim_xw" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Convert 3 attribute CSV files to Parquet TEEHR format.\n", "attr1 = pd.read_csv(Path(RAW_DATA_FILEPATH, \"gage_attr_2yr_discharge.csv\"))\n", "attr1.to_parquet(Path(ATTRIBUTE_FILEPATH, \"2yr_discharge.parquet\"))\n", "display(attr1)\n", "\n", "attr2 = pd.read_csv(Path(RAW_DATA_FILEPATH, \"gage_attr_drainage_area_km2.csv\"))\n", "attr2.to_parquet(Path(ATTRIBUTE_FILEPATH, \"drainage_area.parquet\"))\n", "display(attr2)\n", "\n", "attr3 = pd.read_csv(Path(RAW_DATA_FILEPATH, \"gage_attr_ecoregion.csv\"))\n", "attr3.to_parquet(Path(ATTRIBUTE_FILEPATH, \"ecoregion.parquet\"))\n", "display(attr3)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Open the OBSERVED timeseries CSV files and review\n", "# Note, these are called the PRIMARY TIMESERIES in TEEHR.\n", "obs_ts = pd.read_csv(Path(RAW_DATA_FILEPATH, \"obs.csv\"))\n", "obs_ts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add the other static columns required for TEEHR and save as Parquet file.\n", "obs_ts['configuration'] = 'usgs'\n", "obs_ts['variable_name'] = 'streamflow_daily_mean'\n", "obs_ts['measurement_unit'] = 'cms'\n", "obs_ts['reference_time'] = None\n", "\n", "# Reference_time column must be cast as type datetime64[ns] if set to None\n", "obs_ts['reference_time'] = obs_ts['reference_time'].astype('datetime64[ns]')\n", "obs_ts.to_parquet(Path(PRIMARY_FILEPATH, \"obs.parquet\"))\n", "obs_ts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Open the BASELINE SIMULATION timeseries CSV files and review.\n", "# This could represent the \"current standard\" simulation.\n", "baseline_ts = pd.read_csv(Path(RAW_DATA_FILEPATH, \"baseline.csv\"))\n", "baseline_ts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add the other static columns required for TEEHR and save as Parquet file.\n", "baseline_ts['configuration'] = 'baseline'\n", "baseline_ts['variable_name'] = 'streamflow_daily_mean'\n", "baseline_ts['measurement_unit'] = 'cms'\n", "baseline_ts['reference_time'] = None\n", "\n", "# Reference_time column must be cast as type datetime64[ns] if set to None\n", "baseline_ts['reference_time'] = (\n", " baseline_ts['reference_time'].astype('datetime64[ns]')\n", ")\n", "baseline_ts.to_parquet(Path(SECONDARY_FILEPATH, \"baseline.parquet\"))\n", "baseline_ts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Open the INNOVATION SIMULATION timeseries CSV files and review.\n", "# This could represent an innovation that you want to compare to the baseline.\n", "sim_ts = pd.read_csv(Path(RAW_DATA_FILEPATH, \"sim.csv\"))\n", "sim_ts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add the other columns required for TEEHR\n", "sim_ts['configuration'] = 'sim'\n", "sim_ts['variable_name'] = 'streamflow_daily_mean'\n", "sim_ts['measurement_unit'] = 'cms'\n", "sim_ts['reference_time'] = None\n", "\n", "# Reference_time column must be cast as type datetime64[ns] if set to None\n", "sim_ts['reference_time'] = (\n", " baseline_ts['reference_time'].astype('datetime64[ns]')\n", ")\n", "sim_ts.to_parquet(Path(SECONDARY_FILEPATH, \"sim.parquet\"))\n", "sim_ts" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# List the contents of the example-1 directory.\n", "# Notice now there are files in the 'teehr_base' directory, but the 'joined' directory is empty.\n", "!tree ~/teehr/example-1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create the joined timeseries table" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this section we will join the primary and secondary timeseries, add attributes and add a user defined field, before exporting to a joined Parquet file. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Setup paths to provide to the TEEHR queries that will allow reading \n", "# Parquet files in the 'table' folder or any subfolders.\n", "PRIMARY_FOLDER = f\"{PRIMARY_FILEPATH}/**/*.parquet\"\n", "SECONDARY_FOLDER = f\"{SECONDARY_FILEPATH}/**/*.parquet\"\n", "GEOMETRY_FOLDER = f\"{GEOMETRY_FILEPATH}/**/*.parquet\"\n", "CROSSWALK_FOLDER = f\"{CROSSWALK_FILEPATH}/**/*.parquet\"\n", "ATTRIBUTE_FOLDER = f\"{ATTRIBUTE_FILEPATH}/**/*.parquet\"\n", "JOINED_FOLDER = f\"{JOINED_FILEPATH}/**/*.parquet\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# If there is an existing database, delete it and create a new one.\n", "if DB_FILEPATH.is_file():\n", " DB_FILEPATH.unlink()\n", "\n", "ddb = DuckDBDatabase(DB_FILEPATH)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Join and insert the timeseries data to the temporary database.\n", "ddb.insert_joined_timeseries(\n", " primary_filepath=PRIMARY_FOLDER,\n", " secondary_filepath=SECONDARY_FOLDER,\n", " crosswalk_filepath=CROSSWALK_FOLDER,\n", " drop_added_fields=True,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Join and insert the attributes data to the temporary database.\n", "ddb.insert_attributes(ATTRIBUTE_FOLDER)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add month as a calculated field\n", "\n", "# Function arguments should have the same data type as the fields used. \n", "# Note: In the data model, all attribute values are added to the db as type 'str' \n", "def add_month_field(arg1: datetime) -> int:\n", " return arg1.month\n", " \n", "# month\n", "ddb.insert_calculated_field(new_field_name=\"month\",\n", " new_field_type=\"INTEGER\",\n", " parameter_names=[\"value_time\"],\n", " user_defined_function=add_month_field)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# View fields now in the DB\n", "ddb.get_joined_timeseries_schema()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The database is temporary and disposable. Lets export the joined data to Parquet files.\n", "ddb.query(f\"\"\"\n", " COPY (\n", " SELECT *\n", " FROM joined_timeseries\n", " ORDER BY configuration, primary_location_id, value_time\n", " )\n", " TO '{JOINED_FILEPATH}/joined.parquet' (FORMAT PARQUET)\n", "\"\"\")\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# The temprary database is not needed any longer. Delete it.\n", "if DB_FILEPATH.is_file():\n", " DB_FILEPATH.unlink()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# List the contents of the example-1 directory.\n", "# Notice now there are files in the 'joined' directory.\n", "!tree ~/teehr/example-1" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Query the joined Parquet file(s) and create a few simple plots." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a DuckDB joined Parquet class instance to interact with the joined Parquet files.\n", "jpdb = DuckDBJoinedParquet(JOINED_FOLDER, GEOMETRY_FOLDER)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Get the joined timeseries for 'gage-A'\n", "# Note, the data being evaluated is simulated but not forecasts data. As such,\n", "# it does not have a reference_time.\n", "joined = jpdb.get_joined_timeseries(\n", " filters=[\n", " {\n", " \"column\": \"primary_location_id\",\n", " \"operator\": \"=\",\n", " \"value\": \"gage-A\"\n", " }\n", " ],\n", " order_by=[\"primary_location_id\", \"configuration\", \"value_time\"],\n", ")\n", "joined.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Create a basic plot that shows observed, baseline and simulated timeseries based on the query above. \n", "baseline = joined[joined[\"configuration\"] == \"baseline\"].copy()\n", "sim = joined[joined[\"configuration\"] == \"sim\"].copy()\n", "\n", "(\n", " baseline.hvplot(x=\"value_time\", y=[\"primary_value\"], label=\"Observed\", legend=True) \n", " * baseline.hvplot(x=\"value_time\", y=[\"secondary_value\"], label=\"Baseline\", legend=True) \n", " * sim.hvplot(x=\"value_time\", y=[\"secondary_value\"], label=\"Simulated\", legend=True)\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Generate some simple metrics from the joined timseries table and include geometry in response.\n", "metrics = jpdb.get_metrics(\n", " group_by=[\"primary_location_id\", \"configuration\"],\n", " order_by=[\"primary_location_id\", \"configuration\"],\n", " include_metrics=\"all\",\n", " include_geometry=True\n", ")\n", "metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Make a simple plot and color stations by relative_bias.\n", "metrics_prj = metrics.to_crs(\"EPSG:3857\")\n", "tiles = hv.element.tiles.CartoLight() #gv.tile_sources.OSM\n", "hvplot = metrics_prj.hvplot(\n", " c='relative_bias', \n", " crs=ccrs.GOOGLE_MERCATOR,\n", " size=75,\n", " cmap='RdYlGn'\n", ")\n", "\n", "(tiles*hvplot).opts(width=500, height=500)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This concludes this simple indroductory example. For more in-depth examples that start to show the power and performance of the TEEHR platform please see `example-2` and `example-3`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.11.8" } }, "nbformat": 4, "nbformat_minor": 4 }