{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Loading Local Data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overview\n", "In the `Introduction to the Schema` lesson we explored what data makes up an Evaluation dataset and how it is stored on disk. In this lesson we are going to explore how your data can be added to a new or existing Evaluation dataset.\n", "\n", "Similar to previous lesson, we will start by importing the required packages and creating a Evaluation class instance that references a directory where the data will be stored, and then clone the Evaluation template as a starting point." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import teehr\n", "import teehr.example_data.v0_3_test_study as test_study_data\n", "from pathlib import Path\n", "import shutil\n", "import geopandas as gpd\n", "import pandas as pd\n", "\n", "# Tell Bokeh to output plots in the notebook\n", "from bokeh.io import output_notebook\n", "output_notebook()" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "# Define the directory where the Evaluation will be created\n", "test_eval_dir = Path(Path().home(), \"temp\", \"02_loading_data\")\n", "shutil.rmtree(test_eval_dir, ignore_errors=True)\n", "\n", "# Create an Evaluation object and create the directory\n", "ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)\n", "\n", "# Clone the template\n", "ev.clone_template()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "For this example, we will use the data from the example dataset which is stored in the `tests/data` folder of the teehr repository. We will copy the data to the Evaluation directory, examine the contents and then load into the TEEHR Evaluation dataset. Note these data are for demonstration purposes only. We have given the data made-up location IDs and fake timeseries values for demonstration purposes. We will work with real data in future exercises." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Locations\n", "First lets download some location data. We will download the `gages.geojson` file which is a GeoJSON file that contains 3 fictional gages." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "location_data_path = Path(test_eval_dir, \"gages.geojson\")\n", "test_study_data.fetch_file(\"gages.geojson\", location_data_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now lets open the `gages.geojson` file with GeoPandas and take a look at the contents." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "location_data_path = Path(test_eval_dir, \"gages.geojson\")\n", "gdf = gpd.read_file(location_data_path)\n", "gdf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can see that the file contains 3 gages that are each represented by a point. Next we will load the location data into the TEEHR dataset using methods on the `locations` table sub-class. Note that this file has the `id`, `name` and `geometry` fields required by TEEHR. If it didn't, like all the table sub-class loading methods, the `load_spatial` method can take a `field_mapping` dictionary that can be used to specify which fields should be mapped to which required field name. We will look more closely at `field_mapping` when loading some other datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "ev.locations.load_spatial(location_data_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that the location data has been loaded into the TEEHR dataset, it can be queried with the `ev.locations` class methods, but more importantly, now we can load timeseries that references the `id` in the locations data. The `id` in the `locations` table is the primary location ID and is referenced in the `primary_timeseries` table as `location_id`.\n", "\n", "First lets query the location data to verify it has been loaded, then look at a map to verify it is in the correct locations, then we will move on loading some timeseries data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "locations_gdf = ev.locations.to_geopandas()\n", "locations_gdf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This table of locations came from the TEEHR dataset and matches what we saw in the source file, so we know it was loaded successfully. Now lets look at a plot using the built-in `teehr.location_map()` method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "locations_gdf.teehr.location_map()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Primary Timeseries\n", "Ok, now that we have loaded the location data, we can load some timeseries to the `primary_timeseries` table where the `location_id` field in the `primary_timeseries` table references the `id` field in the `locations` table. The primary timeseries represents the timeseries that simulations will be compared to (i.e., the truth).\n", "\n", "First, let's download a test timeseries from the TEEHR repository as a `*.csv` file, open and examine the contents with Pandas, and load into the TEEHR dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "primary_timeseries_path = Path(test_eval_dir, \"test_short_obs.csv\")\n", "test_study_data.fetch_file(\"test_short_obs.csv\", primary_timeseries_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "primary_timeseries_df = pd.read_csv(primary_timeseries_path)\n", "primary_timeseries_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "There a few things we need to look at and consider before loading the data into the TEEHR dataset. First, the field names. You can see above that while the schema of this dataset is close to the TEEHR schema, it is not exactly correct, specifically `configuration` should be `configuration_name` and `measurement_unit` should be `unit_name`. To address this situation we can use the `field_mapping` argument to map the existing field names to the required field names. The next thing that needs to be dealt with are the field values for fields that reference values in another table, for example a domain table or the location table. Lets start with `unit_name`. The `unit_name` values need to reference a value from the `name` field in the `units` table. Next lets look at what values are in the `units` table." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query the units table\n", "ev.units.to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we can see that `m^3/s` is in the `units` table, so we are all set with values in the `measurement_units` field. Next lets check the `variables` table." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.variables.to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Here we see that `streamflow` is not in the `name` field of the `variables` table. There are two ways to remedy this:\n", "1) we could add the value `streamflow` to the `variables` table, or \n", "2) we can use the `constant_field_values` argument to set the `variable_name` to an allowed values. \n", "\n", "In this case we will go with the latter and set the `variable_name` to `streamflow_hourly_inst`. Next we need to address the `configuration_name` field. First lets see what values are in the `configurations` table. Because the `configurations` table is rather large, we will search it for `test_obs` to see if that value is in there (it isn't)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.configurations.filter(\n", " {\n", " \"column\": \"name\",\n", " \"operator\": \"=\",\n", " \"value\": \"test_obs\"\n", " }\n", ").to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see by the empty DataFrame that a configuration named `test_obs` is not in the table. Again, we have two options, we could use the `constant_field_values` argument to set the `configuration_name` to a value that is in the table (like `usgs_observations`) or we could add a new row to the `configurations` table. Again, we will use the `constant_field_values` argument to set the value to a constant. When we insert the `secondary_timeseries` below we will demonstrate how to add a new row to the `configurations` domain table." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load the timeseries data and map over the fields and set constants\n", "ev.primary_timeseries.load_csv(\n", " in_path=primary_timeseries_path,\n", " field_mapping={\n", " \"reference_time\": \"reference_time\",\n", " \"value_time\": \"value_time\",\n", " \"configuration\": \"configuration_name\",\n", " \"measurement_unit\": \"unit_name\",\n", " \"variable_name\": \"variable_name\",\n", " \"value\": \"value\",\n", " \"location_id\": \"location_id\"\n", " },\n", " constant_field_values={\n", " \"unit_name\": \"m^3/s\",\n", " \"variable_name\": \"streamflow_hourly_inst\",\n", " \"configuration_name\": \"usgs_observations\"\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we can query for a single timeseries and make sure the data was loaded." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "primary_timeseries_df = ev.primary_timeseries.filter(\n", " {\n", " \"column\": \"location_id\",\n", " \"operator\": \"=\",\n", " \"value\": \"gage-A\"\n", " }\n", ").to_pandas()\n", "primary_timeseries_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And create a plot using the `df.teehr.timeseries_plot()` method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "primary_timeseries_df.teehr.timeseries_plot()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Secondary Timeseries\n", "Next we will load some simulated data that we want to compare to the observed timeseries (`primary_timeseries`). The `secondary_timeseries` table is where the \"simulated\" timeseries data are stored. First we will download some test secondary timeseries from the TEEHR repository." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "secondary_timeseries_path = Path(test_eval_dir, \"test_short_fcast.parquet\")\n", "test_study_data.fetch_file(\"test_short_fcast.parquet\", secondary_timeseries_path)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similar to what we did with the primary timeseries, we will open the timeseries with Panadas to see what the data looks like, then we will load it into the TEEHR dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "secondary_timeseries_df = pd.read_parquet(secondary_timeseries_path)\n", "secondary_timeseries_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "When loading `secondary_timeseries`, the same considerations regarding field names and field values apply as those for `primary_timeseries`. Unlike when we loaded the `primary_timeseries`, this time we will add a new configuration to the `configurations` table for the `test_short` configuration. This is more common that the `configuration_name` for the secondary timeseries that you are loading is not in the `configurations` table because this is where new research results will be loaded for evaluation." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.configurations.add(\n", " teehr.Configuration(\n", " name=\"test_short\",\n", " type=\"secondary\",\n", " description=\"Test Forecast Configuration\",\n", " )\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Location Crosswalks\n", "There is one other consideration before we can load the `secondary_timeseries`. There is no expectation in the TEEHR schema that the `location_id` used of the `primary_timeseries` matches the `location_id` in the `secondary_timeseries`. Therefore, it is necessary to load location crosswalk data to the `location_crosswalks` table so that TEEHR knows which `primary_location_id` matches which `secondary_location_id`. For example, in our test dataset, the observation data at `location_id` \"gage-A\" relates to forecasts with a `location_id` \"fcst-1\". \n", "\n", "We already have a file with this information, so like we did with the other test datasets, we will download the data from the TEEHR repository, open it with Pandas to examine the contents and load into the TEEHR dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "crosswalk_path = Path(test_eval_dir, \"crosswalk.csv\")\n", "test_study_data.fetch_file(\"crosswalk.csv\", crosswalk_path)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "crosswalk_df = pd.read_csv(crosswalk_path)\n", "crosswalk_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case we can see that the crosswalk table already has the field names we need (`primary_location_id` and `secondary_location_id`) so we don't need to use the `field_mapping` argument to pass the loading function a field mapping, and we can load it as-is." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load the crosswalk data to the crosswalks table\n", "ev.location_crosswalks.load_csv(crosswalk_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have loaded the `location_crosswalks` data, we can load the `secondary_timeseries`. Note, unlike when we loaded the `primary_timeseries`, in this case we did not have to set a constant field value for `configuration_name` because we added the configuration to the `configurations` table." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Load the secondary timeseries data and map over the fields and set constants\n", "ev.secondary_timeseries.load_parquet(\n", " in_path=secondary_timeseries_path,\n", " field_mapping={\n", " \"reference_time\": \"reference_time\",\n", " \"value_time\": \"value_time\",\n", " \"configuration\": \"configuration_name\",\n", " \"measurement_unit\": \"unit_name\",\n", " \"variable_name\": \"variable_name\",\n", " \"value\": \"value\",\n", " \"location_id\": \"location_id\"\n", " },\n", " constant_field_values={\n", " \"unit_name\": \"m^3/s\",\n", " \"variable_name\": \"streamflow_hourly_inst\"\n", " }\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And to verify that it loaded, we can run a query to see the data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Query the secondary timeseries data for a single forecast.\n", "secondary_timeseries_df = (\n", " ev.secondary_timeseries\n", " .filter(\n", " {\n", " \"column\": \"reference_time\",\n", " \"operator\": \"=\",\n", " \"value\": \"2022-01-01\"\n", " }\n", " )\n", ").to_pandas()\n", "secondary_timeseries_df.head()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "secondary_timeseries_df.teehr.timeseries_plot()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "At this point we have added data to the following TEEHR data tables:\n", "- locations\n", "- configurations\n", "- location_crosswalks \n", "- primary_timeseries\n", "- secondary_timeseries\n", "\n", "This is the bare minimum needed to conduct an evaluation, but we are going to add one more type of data, `location_attributes`. Location attributes are data that provide some type of extra information for the primary locations. These are data are analogous to the columns in a geospatial data file \"attribute table\" but are stored in a separate table to keep the schema consistent across evaluations (i.e., we don't add new columns for each new attribute)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "hide-output" ] }, "outputs": [], "source": [ "attr_2yr_discharge_path = Path(test_eval_dir, \"test_attr_2yr_discharge.csv\")\n", "test_study_data.fetch_file(\"test_attr_2yr_discharge.csv\", attr_2yr_discharge_path)\n", "\n", "attr_drainage_area_km2_path = Path(test_eval_dir, \"test_attr_drainage_area_km2.csv\")\n", "test_study_data.fetch_file(\"test_attr_drainage_area_km2.csv\", attr_drainage_area_km2_path)\n", "\n", "attr_ecoregion_path = Path(test_eval_dir, \"test_attr_ecoregion.csv\")\n", "test_study_data.fetch_file(\"test_attr_ecoregion.csv\", attr_ecoregion_path)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Like before, lets take a look at the raw data we downloaded from the repository." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Read and display the 3 attribute table contents\n", "attr_2yr_discharge_df = pd.read_csv(Path(test_eval_dir, \"test_attr_2yr_discharge.csv\"))\n", "display(attr_2yr_discharge_df)\n", "\n", "attr_drainage_area_km2_df = pd.read_csv(Path(test_eval_dir, \"test_attr_drainage_area_km2.csv\"))\n", "display(attr_drainage_area_km2_df)\n", "\n", "attr_ecoregion_df = pd.read_csv(Path(test_eval_dir, \"test_attr_ecoregion.csv\"))\n", "display(attr_ecoregion_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "We can see we have data for three different attributes here, `year_2_discharge`,`drainage_area`, `ecoregion`. Before we can load this data we need to add the attribute names to the `attributes` table and then we can add the location attribute values to the `location_attributes` table. Like we did for adding configurations, we can use the `.add()` method on the table sub-class to add new values." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Add some attributes\n", "ev.attributes.add(\n", " [\n", " teehr.Attribute(\n", " name=\"drainage_area\",\n", " type=\"continuous\",\n", " description=\"Drainage area in square kilometers\"\n", " ),\n", " teehr.Attribute(\n", " name=\"ecoregion\",\n", " type=\"categorical\",\n", " description=\"Ecoregion\"\n", " ),\n", " teehr.Attribute(\n", " name=\"year_2_discharge\",\n", " type=\"continuous\",\n", " description=\"2-yr discharge in cubic meters per second\"\n", " ),\n", " ]\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that the attributes have been added to the `attributes` table, we can load the `location_attributes` data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.location_attributes.load_csv(\n", " in_path=test_eval_dir,\n", " field_mapping={\"attribute_value\": \"value\"},\n", " pattern=\"test_attr_*.csv\",\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "And query to make sure it was added." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.location_attributes.to_pandas()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now we have data in all the tables required to conduct an evaluation. There is one more step to do before we can start \"slicing and dicing\" the data and looking at evaluation metrics - we need to generate the `joined_timeseries` data table. The `joined_timeseries` table is essentially a materialized view that joins the `primary_timeseries` and `secondary_timeseries` based on `location_id`, `value_time`, `variable_name` and `unit_name`, adds the `location_attributes`, and adds any user defined fields.\n", "\n", "Here we are going to execute the `create()` method on the `joined_timeseries` table class to generate the `joined_timeseries` \"materialized view\" from the data we just loaded into the domain, location and timeseries tables. The user defined fields to be added are defined as part of the evaluation in the `[evaluation_dir]/scripts/user_defined_fields.py` Python script. Note, it is possible to skip adding any user defined fields by setting `execute_udf=False` when calling the `create()` method. Creating user defined fields can be a powerful tool to allow per-location, per-timestep fields to be added to the materialized `joined_timeseries` table. This will be covered in more detail in future lessons. For now, lets just set `execute_udf=True` and execute the `user_defined_fields.py` script." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.joined_timeseries.create(execute_udf=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, lets query the `joined_timeseries` table that we just created to see what it contains." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.joined_timeseries.to_pandas().head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In this case, the `joined_timeseries` table has 16 fields. These fields are the result of joining the `primary_timeseries` to the `secondary_timeseries`, joining the `location_attributes` to that, and adding user defined fields. The bullet points below show which fields fall into which category:\n", "\n", "### Timeseries\n", "- location_id\n", "- primary_location_id\n", "- secondary_location_id\n", "- configuration_name\n", "- variable_name\n", "- unit_name\n", "- reference_time\n", "- value_time\n", "- primary_value\n", "- secondary_value\n", "\n", "### Location Attributes\n", "- year_2_discharge'\n", "- ecoregion\n", "- drainage_area\n", "\n", "### User Defined Fields\n", "- month\n", "- year\n", "- water_year" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ev.spark.stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "This concludes the `Loading Local Data` lesson. In the next lesson we will clone an entire evaluation from the TEEHR AWS S3 bucket and start to explore the querying and metrics capabilities." ] }, { "cell_type": "markdown", "metadata": {}, "source": [] } ], "metadata": { "kernelspec": { "display_name": ".venv", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.15" } }, "nbformat": 4, "nbformat_minor": 2 }