Loading Local Data#
Overview#
In the Introduction to the Schema
lesson we explored what data makes up an Evaluation dataset and how it is stored on disk. In this lesson we are going to explore how your data can be added to a new or existing Evaluation dataset.
Similar to previous lesson, we will start by importing the required packages and creating a Evaluation class instance that references a directory where the data will be stored, and then clone the Evaluation template as a starting point.
import teehr
import teehr.example_data.v0_3_test_study as test_study_data
from pathlib import Path
import shutil
import geopandas as gpd
import pandas as pd
# Tell Bokeh to output plots in the notebook
from bokeh.io import output_notebook
output_notebook()
# Define the directory where the Evaluation will be created
test_eval_dir = Path(Path().home(), "temp", "02_loading_data")
shutil.rmtree(test_eval_dir, ignore_errors=True)
# Create an Evaluation object and create the directory
ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)
# Clone the template
ev.clone_template()
Show code cell output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 18:50:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
For this example, we will use the data from the example dataset which is stored in the tests/data
folder of the teehr repository. We will copy the data to the Evaluation directory, examine the contents and then load into the TEEHR Evaluation dataset. Note these data are for demonstration purposes only. We have given the data made-up location IDs and fake timeseries values for demonstration purposes. We will work with real data in future exercises.
Locations#
First lets download some location data. We will download the gages.geojson
file which is a GeoJSON file that contains 3 fictional gages.
location_data_path = Path(test_eval_dir, "gages.geojson")
test_study_data.fetch_file("gages.geojson", location_data_path)
Now lets open the gages.geojson
file with GeoPandas and take a look at the contents.
location_data_path = Path(test_eval_dir, "gages.geojson")
gdf = gpd.read_file(location_data_path)
gdf
id | name | geometry | |
---|---|---|---|
0 | gage-A | Site A | POINT (-80.71918 35.17611) |
1 | gage-B | Site B | POINT (-80.71308 35.15839) |
2 | gage-C | Bite C | POINT (-80.73677 35.18693) |
You can see that the file contains 3 gages that are each represented by a point. Next we will load the location data into the TEEHR dataset using methods on the locations
table sub-class. Note that this file has the id
, name
and geometry
fields required by TEEHR. If it didn’t, like all the table sub-class loading methods, the load_spatial
method can take a field_mapping
dictionary that can be used to specify which fields should be mapped to which required field name. We will look more closely at field_mapping
when loading some other datasets.
ev.locations.load_spatial(location_data_path)
Show code cell output
[Stage 0:> (0 + 1) / 1]
Now that the location data has been loaded into the TEEHR dataset, it can be queried with the ev.locations
class methods, but more importantly, now we can load timeseries that references the id
in the locations data. The id
in the locations
table is the primary location ID and is referenced in the primary_timeseries
table as location_id
.
First lets query the location data to verify it has been loaded, then look at a map to verify it is in the correct locations, then we will move on loading some timeseries data.
locations_gdf = ev.locations.to_geopandas()
locations_gdf
id | name | geometry | |
---|---|---|---|
0 | gage-A | Site A | POINT (-80.71918 35.17611) |
1 | gage-B | Site B | POINT (-80.71308 35.15839) |
2 | gage-C | Bite C | POINT (-80.73677 35.18693) |
This table of locations came from the TEEHR dataset and matches what we saw in the source file, so we know it was loaded successfully. Now lets look at a plot using the built-in teehr.location_map()
method.
locations_gdf.teehr.location_map()
Primary Timeseries#
Ok, now that we have loaded the location data, we can load some timeseries to the primary_timeseries
table where the location_id
field in the primary_timeseries
table references the id
field in the locations
table. The primary timeseries represents the timeseries that simulations will be compared to (i.e., the truth).
First, let’s download a test timeseries from the TEEHR repository as a *.csv
file, open and examine the contents with Pandas, and load into the TEEHR dataset.
primary_timeseries_path = Path(test_eval_dir, "test_short_obs.csv")
test_study_data.fetch_file("test_short_obs.csv", primary_timeseries_path)
primary_timeseries_df = pd.read_csv(primary_timeseries_path)
primary_timeseries_df.head()
reference_time | value_time | value | variable_name | measurement_unit | configuration | location_id | |
---|---|---|---|---|---|---|---|
0 | NaN | 2022-01-01T00:00:00 | 0.1 | streamflow | m^3/s | test_obs | gage-A |
1 | NaN | 2022-01-01T01:00:00 | 1.1 | streamflow | m^3/s | test_obs | gage-A |
2 | NaN | 2022-01-01T02:00:00 | 0.1 | streamflow | m^3/s | test_obs | gage-A |
3 | NaN | 2022-01-01T03:00:00 | 0.2 | streamflow | m^3/s | test_obs | gage-A |
4 | NaN | 2022-01-01T04:00:00 | 0.3 | streamflow | m^3/s | test_obs | gage-A |
There a few things we need to look at and consider before loading the data into the TEEHR dataset. First, the field names. You can see above that while the schema of this dataset is close to the TEEHR schema, it is not exactly correct, specifically configuration
should be configuration_name
and measurement_unit
should be unit_name
. To address this situation we can use the field_mapping
argument to map the existing field names to the required field names. The next thing that needs to be dealt with are the field values for fields that reference values in another table, for example a domain table or the location table. Lets start with unit_name
. The unit_name
values need to reference a value from the name
field in the units
table. Next lets look at what values are in the units
table.
# Query the units table
ev.units.to_pandas()
name | long_name | |
---|---|---|
0 | m^3/s | Cubic Meters Per Second |
1 | ft^3/s | Cubic Feet Per Second |
2 | km^2 | Square Kilometers |
3 | mm/s | Millimeters Per Second |
Here we can see that m^3/s
is in the units
table, so we are all set with values in the measurement_units
field. Next lets check the variables
table.
ev.variables.to_pandas()
name | long_name | |
---|---|---|
0 | streamflow_hourly_inst | Hourly Instantaneous Streamflow |
1 | streamflow_daily_mean | Daily Mean Streamflow |
2 | rainfall_hourly_rate | Hourly Rainfall Rate |
Here we see that streamflow
is not in the name
field of the variables
table. There are two ways to remedy this:
we could add the value
streamflow
to thevariables
table, orwe can use the
constant_field_values
argument to set thevariable_name
to an allowed values.
In this case we will go with the latter and set the variable_name
to streamflow_hourly_inst
. Next we need to address the configuration_name
field. First lets see what values are in the configurations
table. Because the configurations
table is rather large, we will search it for test_obs
to see if that value is in there (it isn’t).
ev.configurations.filter(
{
"column": "name",
"operator": "=",
"value": "test_obs"
}
).to_pandas()
name | type | description |
---|
We can see by the empty DataFrame that a configuration named test_obs
is not in the table. Again, we have two options, we could use the constant_field_values
argument to set the configuration_name
to a value that is in the table (like usgs_observations
) or we could add a new row to the configurations
table. Again, we will use the constant_field_values
argument to set the value to a constant. When we insert the secondary_timeseries
below we will demonstrate how to add a new row to the configurations
domain table.
# Load the timeseries data and map over the fields and set constants
ev.primary_timeseries.load_csv(
in_path=primary_timeseries_path,
field_mapping={
"reference_time": "reference_time",
"value_time": "value_time",
"configuration": "configuration_name",
"measurement_unit": "unit_name",
"variable_name": "variable_name",
"value": "value",
"location_id": "location_id"
},
constant_field_values={
"unit_name": "m^3/s",
"variable_name": "streamflow_hourly_inst",
"configuration_name": "usgs_observations"
}
)
[Stage 12:> (0 + 1) / 1]
Now we can query for a single timeseries and make sure the data was loaded.
primary_timeseries_df = ev.primary_timeseries.filter(
{
"column": "location_id",
"operator": "=",
"value": "gage-A"
}
).to_pandas()
primary_timeseries_df.head()
reference_time | value_time | value | unit_name | location_id | configuration_name | variable_name | |
---|---|---|---|---|---|---|---|
0 | NaT | 2022-01-01 00:00:00 | 0.1 | m^3/s | gage-A | usgs_observations | streamflow_hourly_inst |
1 | NaT | 2022-01-01 01:00:00 | 1.1 | m^3/s | gage-A | usgs_observations | streamflow_hourly_inst |
2 | NaT | 2022-01-01 02:00:00 | 0.1 | m^3/s | gage-A | usgs_observations | streamflow_hourly_inst |
3 | NaT | 2022-01-01 03:00:00 | 0.2 | m^3/s | gage-A | usgs_observations | streamflow_hourly_inst |
4 | NaT | 2022-01-01 04:00:00 | 0.3 | m^3/s | gage-A | usgs_observations | streamflow_hourly_inst |
And create a plot using the df.teehr.timeseries_plot()
method.
primary_timeseries_df.teehr.timeseries_plot()
Secondary Timeseries#
Next we will load some simulated data that we want to compare to the observed timeseries (primary_timeseries
). The secondary_timeseries
table is where the “simulated” timeseries data are stored. First we will download some test secondary timeseries from the TEEHR repository.
secondary_timeseries_path = Path(test_eval_dir, "test_short_fcast.parquet")
test_study_data.fetch_file("test_short_fcast.parquet", secondary_timeseries_path)
Similar to what we did with the primary timeseries, we will open the timeseries with Panadas to see what the data looks like, then we will load it into the TEEHR dataset.
secondary_timeseries_df = pd.read_parquet(secondary_timeseries_path)
secondary_timeseries_df.head()
reference_time | value_time | value | variable_name | measurement_unit | configuration | location_id | |
---|---|---|---|---|---|---|---|
0 | 2022-01-01 | 2022-01-01 00:00:00 | 0.0 | streamflow | m^3/s | test_short | fcst-1 |
1 | 2022-01-01 | 2022-01-01 01:00:00 | 0.1 | streamflow | m^3/s | test_short | fcst-1 |
2 | 2022-01-01 | 2022-01-01 02:00:00 | 0.2 | streamflow | m^3/s | test_short | fcst-1 |
3 | 2022-01-01 | 2022-01-01 03:00:00 | 0.3 | streamflow | m^3/s | test_short | fcst-1 |
4 | 2022-01-01 | 2022-01-01 04:00:00 | 0.4 | streamflow | m^3/s | test_short | fcst-1 |
When loading secondary_timeseries
, the same considerations regarding field names and field values apply as those for primary_timeseries
. Unlike when we loaded the primary_timeseries
, this time we will add a new configuration to the configurations
table for the test_short
configuration. This is more common that the configuration_name
for the secondary timeseries that you are loading is not in the configurations
table because this is where new research results will be loaded for evaluation.
ev.configurations.add(
teehr.Configuration(
name="test_short",
type="secondary",
description="Test Forecast Configuration",
)
)
Location Crosswalks#
There is one other consideration before we can load the secondary_timeseries
. There is no expectation in the TEEHR schema that the location_id
used of the primary_timeseries
matches the location_id
in the secondary_timeseries
. Therefore, it is necessary to load location crosswalk data to the location_crosswalks
table so that TEEHR knows which primary_location_id
matches which secondary_location_id
. For example, in our test dataset, the observation data at location_id
“gage-A” relates to forecasts with a location_id
“fcst-1”.
We already have a file with this information, so like we did with the other test datasets, we will download the data from the TEEHR repository, open it with Pandas to examine the contents and load into the TEEHR dataset.
crosswalk_path = Path(test_eval_dir, "crosswalk.csv")
test_study_data.fetch_file("crosswalk.csv", crosswalk_path)
crosswalk_df = pd.read_csv(crosswalk_path)
crosswalk_df
primary_location_id | secondary_location_id | |
---|---|---|
0 | gage-A | fcst-1 |
1 | gage-B | fcst-2 |
2 | gage-C | fcst-3 |
In this case we can see that the crosswalk table already has the field names we need (primary_location_id
and secondary_location_id
) so we don’t need to use the field_mapping
argument to pass the loading function a field mapping, and we can load it as-is.
# Load the crosswalk data to the crosswalks table
ev.location_crosswalks.load_csv(crosswalk_path)
Now that we have loaded the location_crosswalks
data, we can load the secondary_timeseries
. Note, unlike when we loaded the primary_timeseries
, in this case we did not have to set a constant field value for configuration_name
because we added the configuration to the configurations
table.
# Load the secondary timeseries data and map over the fields and set constants
ev.secondary_timeseries.load_parquet(
in_path=secondary_timeseries_path,
field_mapping={
"reference_time": "reference_time",
"value_time": "value_time",
"configuration": "configuration_name",
"measurement_unit": "unit_name",
"variable_name": "variable_name",
"value": "value",
"location_id": "location_id"
},
constant_field_values={
"unit_name": "m^3/s",
"variable_name": "streamflow_hourly_inst"
}
)
And to verify that it loaded, we can run a query to see the data.
# Query the secondary timeseries data for a single forecast.
secondary_timeseries_df = (
ev.secondary_timeseries
.filter(
{
"column": "reference_time",
"operator": "=",
"value": "2022-01-01"
}
)
).to_pandas()
secondary_timeseries_df.head()
reference_time | value_time | value | unit_name | location_id | configuration_name | variable_name | |
---|---|---|---|---|---|---|---|
0 | 2022-01-01 | 2022-01-01 00:00:00 | 0.0 | m^3/s | fcst-1 | test_short | streamflow_hourly_inst |
1 | 2022-01-01 | 2022-01-01 01:00:00 | 0.1 | m^3/s | fcst-1 | test_short | streamflow_hourly_inst |
2 | 2022-01-01 | 2022-01-01 02:00:00 | 0.2 | m^3/s | fcst-1 | test_short | streamflow_hourly_inst |
3 | 2022-01-01 | 2022-01-01 03:00:00 | 0.3 | m^3/s | fcst-1 | test_short | streamflow_hourly_inst |
4 | 2022-01-01 | 2022-01-01 04:00:00 | 0.4 | m^3/s | fcst-1 | test_short | streamflow_hourly_inst |
secondary_timeseries_df.teehr.timeseries_plot()
At this point we have added data to the following TEEHR data tables:
locations
configurations
location_crosswalks
primary_timeseries
secondary_timeseries
This is the bare minimum needed to conduct an evaluation, but we are going to add one more type of data, location_attributes
. Location attributes are data that provide some type of extra information for the primary locations. These are data are analogous to the columns in a geospatial data file “attribute table” but are stored in a separate table to keep the schema consistent across evaluations (i.e., we don’t add new columns for each new attribute)
attr_2yr_discharge_path = Path(test_eval_dir, "test_attr_2yr_discharge.csv")
test_study_data.fetch_file("test_attr_2yr_discharge.csv", attr_2yr_discharge_path)
attr_drainage_area_km2_path = Path(test_eval_dir, "test_attr_drainage_area_km2.csv")
test_study_data.fetch_file("test_attr_drainage_area_km2.csv", attr_drainage_area_km2_path)
attr_ecoregion_path = Path(test_eval_dir, "test_attr_ecoregion.csv")
test_study_data.fetch_file("test_attr_ecoregion.csv", attr_ecoregion_path)
Like before, lets take a look at the raw data we downloaded from the repository.
# Read and display the 3 attribute table contents
attr_2yr_discharge_df = pd.read_csv(Path(test_eval_dir, "test_attr_2yr_discharge.csv"))
display(attr_2yr_discharge_df)
attr_drainage_area_km2_df = pd.read_csv(Path(test_eval_dir, "test_attr_drainage_area_km2.csv"))
display(attr_drainage_area_km2_df)
attr_ecoregion_df = pd.read_csv(Path(test_eval_dir, "test_attr_ecoregion.csv"))
display(attr_ecoregion_df)
location_id | attribute_name | attribute_value | attribute_unit | |
---|---|---|---|---|
0 | gage-A | year_2_discharge | 500 | ft^3/s |
1 | gage-B | year_2_discharge | 200 | ft^3/s |
2 | gage-C | year_2_discharge | 300 | ft^3/s |
location_id | attribute_name | attribute_value | attribute_unit | |
---|---|---|---|---|
0 | gage-A | drainage_area | 50 | sq_km |
1 | gage-B | drainage_area | 20 | sq_km |
2 | gage-C | drainage_area | 30 | sq_km |
location_id | attribute_name | attribute_value | attribute_unit | |
---|---|---|---|---|
0 | gage-A | ecoregion | coastal_plain | NaN |
1 | gage-B | ecoregion | piedmont | NaN |
2 | gage-C | ecoregion | blue_ridge | NaN |
We can see we have data for three different attributes here, year_2_discharge
,drainage_area
, ecoregion
. Before we can load this data we need to add the attribute names to the attributes
table and then we can add the location attribute values to the location_attributes
table. Like we did for adding configurations, we can use the .add()
method on the table sub-class to add new values.
# Add some attributes
ev.attributes.add(
[
teehr.Attribute(
name="drainage_area",
type="continuous",
description="Drainage area in square kilometers"
),
teehr.Attribute(
name="ecoregion",
type="categorical",
description="Ecoregion"
),
teehr.Attribute(
name="year_2_discharge",
type="continuous",
description="2-yr discharge in cubic meters per second"
),
]
)
Now that the attributes have been added to the attributes
table, we can load the location_attributes
data.
ev.location_attributes.load_csv(
in_path=test_eval_dir,
field_mapping={"attribute_value": "value"},
pattern="test_attr_*.csv",
)
And query to make sure it was added.
ev.location_attributes.to_pandas()
location_id | attribute_name | value | |
---|---|---|---|
0 | gage-A | year_2_discharge | 500 |
1 | gage-B | year_2_discharge | 200 |
2 | gage-C | year_2_discharge | 300 |
3 | gage-A | ecoregion | coastal_plain |
4 | gage-B | ecoregion | piedmont |
5 | gage-C | ecoregion | blue_ridge |
6 | gage-A | drainage_area | 50 |
7 | gage-B | drainage_area | 20 |
8 | gage-C | drainage_area | 30 |
Now we have data in all the tables required to conduct an evaluation. There is one more step to do before we can start “slicing and dicing” the data and looking at evaluation metrics - we need to generate the joined_timeseries
data table. The joined_timeseries
table is essentially a materialized view that joins the primary_timeseries
and secondary_timeseries
based on location_id
, value_time
, variable_name
and unit_name
, adds the location_attributes
, and adds any user defined fields.
Here we are going to execute the create()
method on the joined_timeseries
table class to generate the joined_timeseries
“materialized view” from the data we just loaded into the domain, location and timeseries tables. The user defined fields to be added are defined as part of the evaluation in the [evaluation_dir]/scripts/user_defined_fields.py
Python script. Note, it is possible to skip adding any user defined fields by setting execute_udf=False
when calling the create()
method. Creating user defined fields can be a powerful tool to allow per-location, per-timestep fields to be added to the materialized joined_timeseries
table. This will be covered in more detail in future lessons. For now, lets just set execute_udf=True
and execute the user_defined_fields.py
script.
ev.joined_timeseries.create(execute_udf=True)
[Stage 173:> (0 + 1) / 1]
Now, lets query the joined_timeseries
table that we just created to see what it contains.
ev.joined_timeseries.to_pandas().head()
reference_time | value_time | primary_location_id | secondary_location_id | primary_value | secondary_value | unit_name | location_id | year_2_discharge | ecoregion | drainage_area | month | year | water_year | configuration_name | variable_name | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | 2022-01-01 | 2022-01-01 00:00:00 | gage-A | fcst-1 | 0.1 | 0.0 | m^3/s | gage-A | 500 | coastal_plain | 50 | 1 | 2022 | 2022 | test_short | streamflow_hourly_inst |
1 | 2022-01-01 | 2022-01-01 01:00:00 | gage-A | fcst-1 | 1.1 | 0.1 | m^3/s | gage-A | 500 | coastal_plain | 50 | 1 | 2022 | 2022 | test_short | streamflow_hourly_inst |
2 | 2022-01-01 | 2022-01-01 02:00:00 | gage-A | fcst-1 | 0.1 | 0.2 | m^3/s | gage-A | 500 | coastal_plain | 50 | 1 | 2022 | 2022 | test_short | streamflow_hourly_inst |
3 | 2022-01-01 | 2022-01-01 03:00:00 | gage-A | fcst-1 | 0.2 | 0.3 | m^3/s | gage-A | 500 | coastal_plain | 50 | 1 | 2022 | 2022 | test_short | streamflow_hourly_inst |
4 | 2022-01-01 | 2022-01-01 04:00:00 | gage-A | fcst-1 | 0.3 | 0.4 | m^3/s | gage-A | 500 | coastal_plain | 50 | 1 | 2022 | 2022 | test_short | streamflow_hourly_inst |
In this case, the joined_timeseries
table has 16 fields. These fields are the result of joining the primary_timeseries
to the secondary_timeseries
, joining the location_attributes
to that, and adding user defined fields. The bullet points below show which fields fall into which category:
Timeseries#
location_id
primary_location_id
secondary_location_id
configuration_name
variable_name
unit_name
reference_time
value_time
primary_value
secondary_value
Location Attributes#
year_2_discharge’
ecoregion
drainage_area
User Defined Fields#
month
year
water_year
ev.spark.stop()
This concludes the Loading Local Data
lesson. In the next lesson we will clone an entire evaluation from the TEEHR AWS S3 bucket and start to explore the querying and metrics capabilities.