Loading Local Data#

Overview#

In the Introduction to the Schema lesson we explored what data makes up an Evaluation dataset and how it is stored on disk. In this lesson we are going to explore how your data can be added to a new or existing Evaluation dataset.

Similar to previous lesson, we will start by importing the required packages and creating a Evaluation class instance that references a directory where the data will be stored, and then clone the Evaluation template as a starting point.

import teehr
import teehr.example_data.v0_3_test_study as test_study_data
from pathlib import Path
import shutil
import geopandas as gpd
import pandas as pd

# Tell Bokeh to output plots in the notebook
from bokeh.io import output_notebook
output_notebook()
Loading BokehJS ...
# Define the directory where the Evaluation will be created
test_eval_dir = Path(Path().home(), "temp", "02_loading_data")
shutil.rmtree(test_eval_dir, ignore_errors=True)

# Create an Evaluation object and create the directory
ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)

# Clone the template
ev.clone_template()
Hide code cell output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 18:50:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

For this example, we will use the data from the example dataset which is stored in the tests/data folder of the teehr repository. We will copy the data to the Evaluation directory, examine the contents and then load into the TEEHR Evaluation dataset. Note these data are for demonstration purposes only. We have given the data made-up location IDs and fake timeseries values for demonstration purposes. We will work with real data in future exercises.

Locations#

First lets download some location data. We will download the gages.geojson file which is a GeoJSON file that contains 3 fictional gages.

location_data_path = Path(test_eval_dir, "gages.geojson")
test_study_data.fetch_file("gages.geojson", location_data_path)

Now lets open the gages.geojson file with GeoPandas and take a look at the contents.

location_data_path = Path(test_eval_dir, "gages.geojson")
gdf = gpd.read_file(location_data_path)
gdf
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
1 gage-B Site B POINT (-80.71308 35.15839)
2 gage-C Bite C POINT (-80.73677 35.18693)

You can see that the file contains 3 gages that are each represented by a point. Next we will load the location data into the TEEHR dataset using methods on the locations table sub-class. Note that this file has the id, name and geometry fields required by TEEHR. If it didn’t, like all the table sub-class loading methods, the load_spatial method can take a field_mapping dictionary that can be used to specify which fields should be mapped to which required field name. We will look more closely at field_mapping when loading some other datasets.

ev.locations.load_spatial(location_data_path)
Hide code cell output
[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

Now that the location data has been loaded into the TEEHR dataset, it can be queried with the ev.locations class methods, but more importantly, now we can load timeseries that references the id in the locations data. The id in the locations table is the primary location ID and is referenced in the primary_timeseries table as location_id.

First lets query the location data to verify it has been loaded, then look at a map to verify it is in the correct locations, then we will move on loading some timeseries data.

locations_gdf = ev.locations.to_geopandas()
locations_gdf
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
1 gage-B Site B POINT (-80.71308 35.15839)
2 gage-C Bite C POINT (-80.73677 35.18693)

This table of locations came from the TEEHR dataset and matches what we saw in the source file, so we know it was loaded successfully. Now lets look at a plot using the built-in teehr.location_map() method.

locations_gdf.teehr.location_map()

Primary Timeseries#

Ok, now that we have loaded the location data, we can load some timeseries to the primary_timeseries table where the location_id field in the primary_timeseries table references the id field in the locations table. The primary timeseries represents the timeseries that simulations will be compared to (i.e., the truth).

First, let’s download a test timeseries from the TEEHR repository as a *.csv file, open and examine the contents with Pandas, and load into the TEEHR dataset.

primary_timeseries_path = Path(test_eval_dir, "test_short_obs.csv")
test_study_data.fetch_file("test_short_obs.csv", primary_timeseries_path)
primary_timeseries_df = pd.read_csv(primary_timeseries_path)
primary_timeseries_df.head()
reference_time value_time value variable_name measurement_unit configuration location_id
0 NaN 2022-01-01T00:00:00 0.1 streamflow m^3/s test_obs gage-A
1 NaN 2022-01-01T01:00:00 1.1 streamflow m^3/s test_obs gage-A
2 NaN 2022-01-01T02:00:00 0.1 streamflow m^3/s test_obs gage-A
3 NaN 2022-01-01T03:00:00 0.2 streamflow m^3/s test_obs gage-A
4 NaN 2022-01-01T04:00:00 0.3 streamflow m^3/s test_obs gage-A

There a few things we need to look at and consider before loading the data into the TEEHR dataset. First, the field names. You can see above that while the schema of this dataset is close to the TEEHR schema, it is not exactly correct, specifically configuration should be configuration_name and measurement_unit should be unit_name. To address this situation we can use the field_mapping argument to map the existing field names to the required field names. The next thing that needs to be dealt with are the field values for fields that reference values in another table, for example a domain table or the location table. Lets start with unit_name. The unit_name values need to reference a value from the name field in the units table. Next lets look at what values are in the units table.

# Query the units table
ev.units.to_pandas()
name long_name
0 m^3/s Cubic Meters Per Second
1 ft^3/s Cubic Feet Per Second
2 km^2 Square Kilometers
3 mm/s Millimeters Per Second

Here we can see that m^3/s is in the units table, so we are all set with values in the measurement_units field. Next lets check the variables table.

ev.variables.to_pandas()
name long_name
0 streamflow_hourly_inst Hourly Instantaneous Streamflow
1 streamflow_daily_mean Daily Mean Streamflow
2 rainfall_hourly_rate Hourly Rainfall Rate

Here we see that streamflow is not in the name field of the variables table. There are two ways to remedy this:

  1. we could add the value streamflow to the variables table, or

  2. we can use the constant_field_values argument to set the variable_name to an allowed values.

In this case we will go with the latter and set the variable_name to streamflow_hourly_inst. Next we need to address the configuration_name field. First lets see what values are in the configurations table. Because the configurations table is rather large, we will search it for test_obs to see if that value is in there (it isn’t).

ev.configurations.filter(
    {
        "column": "name",
        "operator": "=",
        "value": "test_obs"
    }
).to_pandas()
name type description

We can see by the empty DataFrame that a configuration named test_obs is not in the table. Again, we have two options, we could use the constant_field_values argument to set the configuration_name to a value that is in the table (like usgs_observations) or we could add a new row to the configurations table. Again, we will use the constant_field_values argument to set the value to a constant. When we insert the secondary_timeseries below we will demonstrate how to add a new row to the configurations domain table.

# Load the timeseries data and map over the fields and set constants
ev.primary_timeseries.load_csv(
    in_path=primary_timeseries_path,
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "variable_name": "streamflow_hourly_inst",
        "configuration_name": "usgs_observations"
    }
)
[Stage 12:>                                                         (0 + 1) / 1]
                                                                                

Now we can query for a single timeseries and make sure the data was loaded.

primary_timeseries_df = ev.primary_timeseries.filter(
    {
        "column": "location_id",
        "operator": "=",
        "value": "gage-A"
    }
).to_pandas()
primary_timeseries_df.head()
reference_time value_time value unit_name location_id configuration_name variable_name
0 NaT 2022-01-01 00:00:00 0.1 m^3/s gage-A usgs_observations streamflow_hourly_inst
1 NaT 2022-01-01 01:00:00 1.1 m^3/s gage-A usgs_observations streamflow_hourly_inst
2 NaT 2022-01-01 02:00:00 0.1 m^3/s gage-A usgs_observations streamflow_hourly_inst
3 NaT 2022-01-01 03:00:00 0.2 m^3/s gage-A usgs_observations streamflow_hourly_inst
4 NaT 2022-01-01 04:00:00 0.3 m^3/s gage-A usgs_observations streamflow_hourly_inst

And create a plot using the df.teehr.timeseries_plot() method.

primary_timeseries_df.teehr.timeseries_plot()

Secondary Timeseries#

Next we will load some simulated data that we want to compare to the observed timeseries (primary_timeseries). The secondary_timeseries table is where the “simulated” timeseries data are stored. First we will download some test secondary timeseries from the TEEHR repository.

secondary_timeseries_path = Path(test_eval_dir, "test_short_fcast.parquet")
test_study_data.fetch_file("test_short_fcast.parquet", secondary_timeseries_path)

Similar to what we did with the primary timeseries, we will open the timeseries with Panadas to see what the data looks like, then we will load it into the TEEHR dataset.

secondary_timeseries_df = pd.read_parquet(secondary_timeseries_path)
secondary_timeseries_df.head()
reference_time value_time value variable_name measurement_unit configuration location_id
0 2022-01-01 2022-01-01 00:00:00 0.0 streamflow m^3/s test_short fcst-1
1 2022-01-01 2022-01-01 01:00:00 0.1 streamflow m^3/s test_short fcst-1
2 2022-01-01 2022-01-01 02:00:00 0.2 streamflow m^3/s test_short fcst-1
3 2022-01-01 2022-01-01 03:00:00 0.3 streamflow m^3/s test_short fcst-1
4 2022-01-01 2022-01-01 04:00:00 0.4 streamflow m^3/s test_short fcst-1

When loading secondary_timeseries, the same considerations regarding field names and field values apply as those for primary_timeseries. Unlike when we loaded the primary_timeseries, this time we will add a new configuration to the configurations table for the test_short configuration. This is more common that the configuration_name for the secondary timeseries that you are loading is not in the configurations table because this is where new research results will be loaded for evaluation.

ev.configurations.add(
    teehr.Configuration(
        name="test_short",
        type="secondary",
        description="Test Forecast Configuration",
    )
)

Location Crosswalks#

There is one other consideration before we can load the secondary_timeseries. There is no expectation in the TEEHR schema that the location_id used of the primary_timeseries matches the location_id in the secondary_timeseries. Therefore, it is necessary to load location crosswalk data to the location_crosswalks table so that TEEHR knows which primary_location_id matches which secondary_location_id. For example, in our test dataset, the observation data at location_id “gage-A” relates to forecasts with a location_id “fcst-1”.

We already have a file with this information, so like we did with the other test datasets, we will download the data from the TEEHR repository, open it with Pandas to examine the contents and load into the TEEHR dataset.

crosswalk_path = Path(test_eval_dir, "crosswalk.csv")
test_study_data.fetch_file("crosswalk.csv", crosswalk_path)
crosswalk_df = pd.read_csv(crosswalk_path)
crosswalk_df
primary_location_id secondary_location_id
0 gage-A fcst-1
1 gage-B fcst-2
2 gage-C fcst-3

In this case we can see that the crosswalk table already has the field names we need (primary_location_id and secondary_location_id) so we don’t need to use the field_mapping argument to pass the loading function a field mapping, and we can load it as-is.

# Load the crosswalk data to the crosswalks table
ev.location_crosswalks.load_csv(crosswalk_path)

Now that we have loaded the location_crosswalks data, we can load the secondary_timeseries. Note, unlike when we loaded the primary_timeseries, in this case we did not have to set a constant field value for configuration_name because we added the configuration to the configurations table.

# Load the secondary timeseries data and map over the fields and set constants
ev.secondary_timeseries.load_parquet(
    in_path=secondary_timeseries_path,
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "variable_name": "streamflow_hourly_inst"
    }
)

And to verify that it loaded, we can run a query to see the data.

# Query the secondary timeseries data for a single forecast.
secondary_timeseries_df = (
    ev.secondary_timeseries
    .filter(
        {
            "column": "reference_time",
            "operator": "=",
            "value": "2022-01-01"
        }
    )
).to_pandas()
secondary_timeseries_df.head()
reference_time value_time value unit_name location_id configuration_name variable_name
0 2022-01-01 2022-01-01 00:00:00 0.0 m^3/s fcst-1 test_short streamflow_hourly_inst
1 2022-01-01 2022-01-01 01:00:00 0.1 m^3/s fcst-1 test_short streamflow_hourly_inst
2 2022-01-01 2022-01-01 02:00:00 0.2 m^3/s fcst-1 test_short streamflow_hourly_inst
3 2022-01-01 2022-01-01 03:00:00 0.3 m^3/s fcst-1 test_short streamflow_hourly_inst
4 2022-01-01 2022-01-01 04:00:00 0.4 m^3/s fcst-1 test_short streamflow_hourly_inst
secondary_timeseries_df.teehr.timeseries_plot()

At this point we have added data to the following TEEHR data tables:

  • locations

  • configurations

  • location_crosswalks

  • primary_timeseries

  • secondary_timeseries

This is the bare minimum needed to conduct an evaluation, but we are going to add one more type of data, location_attributes. Location attributes are data that provide some type of extra information for the primary locations. These are data are analogous to the columns in a geospatial data file “attribute table” but are stored in a separate table to keep the schema consistent across evaluations (i.e., we don’t add new columns for each new attribute)

attr_2yr_discharge_path = Path(test_eval_dir, "test_attr_2yr_discharge.csv")
test_study_data.fetch_file("test_attr_2yr_discharge.csv", attr_2yr_discharge_path)

attr_drainage_area_km2_path = Path(test_eval_dir, "test_attr_drainage_area_km2.csv")
test_study_data.fetch_file("test_attr_drainage_area_km2.csv", attr_drainage_area_km2_path)

attr_ecoregion_path = Path(test_eval_dir, "test_attr_ecoregion.csv")
test_study_data.fetch_file("test_attr_ecoregion.csv", attr_ecoregion_path)

Like before, lets take a look at the raw data we downloaded from the repository.

# Read and display the 3 attribute table contents
attr_2yr_discharge_df = pd.read_csv(Path(test_eval_dir, "test_attr_2yr_discharge.csv"))
display(attr_2yr_discharge_df)

attr_drainage_area_km2_df = pd.read_csv(Path(test_eval_dir, "test_attr_drainage_area_km2.csv"))
display(attr_drainage_area_km2_df)

attr_ecoregion_df = pd.read_csv(Path(test_eval_dir, "test_attr_ecoregion.csv"))
display(attr_ecoregion_df)
location_id attribute_name attribute_value attribute_unit
0 gage-A year_2_discharge 500 ft^3/s
1 gage-B year_2_discharge 200 ft^3/s
2 gage-C year_2_discharge 300 ft^3/s
location_id attribute_name attribute_value attribute_unit
0 gage-A drainage_area 50 sq_km
1 gage-B drainage_area 20 sq_km
2 gage-C drainage_area 30 sq_km
location_id attribute_name attribute_value attribute_unit
0 gage-A ecoregion coastal_plain NaN
1 gage-B ecoregion piedmont NaN
2 gage-C ecoregion blue_ridge NaN

We can see we have data for three different attributes here, year_2_discharge,drainage_area, ecoregion. Before we can load this data we need to add the attribute names to the attributes table and then we can add the location attribute values to the location_attributes table. Like we did for adding configurations, we can use the .add() method on the table sub-class to add new values.

# Add some attributes
ev.attributes.add(
    [
        teehr.Attribute(
            name="drainage_area",
            type="continuous",
            description="Drainage area in square kilometers"
        ),
        teehr.Attribute(
            name="ecoregion",
            type="categorical",
            description="Ecoregion"
        ),
        teehr.Attribute(
            name="year_2_discharge",
            type="continuous",
            description="2-yr discharge in cubic meters per second"
        ),
    ]
)

Now that the attributes have been added to the attributes table, we can load the location_attributes data.

ev.location_attributes.load_csv(
    in_path=test_eval_dir,
    field_mapping={"attribute_value": "value"},
    pattern="test_attr_*.csv",
)

And query to make sure it was added.

ev.location_attributes.to_pandas()
location_id attribute_name value
0 gage-A year_2_discharge 500
1 gage-B year_2_discharge 200
2 gage-C year_2_discharge 300
3 gage-A ecoregion coastal_plain
4 gage-B ecoregion piedmont
5 gage-C ecoregion blue_ridge
6 gage-A drainage_area 50
7 gage-B drainage_area 20
8 gage-C drainage_area 30

Now we have data in all the tables required to conduct an evaluation. There is one more step to do before we can start “slicing and dicing” the data and looking at evaluation metrics - we need to generate the joined_timeseries data table. The joined_timeseries table is essentially a materialized view that joins the primary_timeseries and secondary_timeseries based on location_id, value_time, variable_name and unit_name, adds the location_attributes, and adds any user defined fields.

Here we are going to execute the create() method on the joined_timeseries table class to generate the joined_timeseries “materialized view” from the data we just loaded into the domain, location and timeseries tables. The user defined fields to be added are defined as part of the evaluation in the [evaluation_dir]/scripts/user_defined_fields.py Python script. Note, it is possible to skip adding any user defined fields by setting execute_udf=False when calling the create() method. Creating user defined fields can be a powerful tool to allow per-location, per-timestep fields to be added to the materialized joined_timeseries table. This will be covered in more detail in future lessons. For now, lets just set execute_udf=True and execute the user_defined_fields.py script.

ev.joined_timeseries.create(execute_udf=True)
[Stage 173:>                                                        (0 + 1) / 1]
                                                                                

Now, lets query the joined_timeseries table that we just created to see what it contains.

ev.joined_timeseries.to_pandas().head()
reference_time value_time primary_location_id secondary_location_id primary_value secondary_value unit_name location_id year_2_discharge ecoregion drainage_area month year water_year configuration_name variable_name
0 2022-01-01 2022-01-01 00:00:00 gage-A fcst-1 0.1 0.0 m^3/s gage-A 500 coastal_plain 50 1 2022 2022 test_short streamflow_hourly_inst
1 2022-01-01 2022-01-01 01:00:00 gage-A fcst-1 1.1 0.1 m^3/s gage-A 500 coastal_plain 50 1 2022 2022 test_short streamflow_hourly_inst
2 2022-01-01 2022-01-01 02:00:00 gage-A fcst-1 0.1 0.2 m^3/s gage-A 500 coastal_plain 50 1 2022 2022 test_short streamflow_hourly_inst
3 2022-01-01 2022-01-01 03:00:00 gage-A fcst-1 0.2 0.3 m^3/s gage-A 500 coastal_plain 50 1 2022 2022 test_short streamflow_hourly_inst
4 2022-01-01 2022-01-01 04:00:00 gage-A fcst-1 0.3 0.4 m^3/s gage-A 500 coastal_plain 50 1 2022 2022 test_short streamflow_hourly_inst

In this case, the joined_timeseries table has 16 fields. These fields are the result of joining the primary_timeseries to the secondary_timeseries, joining the location_attributes to that, and adding user defined fields. The bullet points below show which fields fall into which category:

Timeseries#

  • location_id

  • primary_location_id

  • secondary_location_id

  • configuration_name

  • variable_name

  • unit_name

  • reference_time

  • value_time

  • primary_value

  • secondary_value

Location Attributes#

  • year_2_discharge’

  • ecoregion

  • drainage_area

User Defined Fields#

  • month

  • year

  • water_year

ev.spark.stop()

This concludes the Loading Local Data lesson. In the next lesson we will clone an entire evaluation from the TEEHR AWS S3 bucket and start to explore the querying and metrics capabilities.