Introduction to the Evaluation Class#

Overview#

In the previous lesson we loaded some test data from the user’s local drive (although we first downloaded it from the repository). In this example we will continue to explore the Evaluation schema through the Evaluation class interface.

Note: this lesson builds off of the dataset that we created in the last lesson Loading Local Data. If you have not run through the Loading Local Data lesson, then go back and first work though that notebook to generate the required dataset for this lesson.

Create a new Evaluation#

First we will import the the TEEHR Evaluation class and create a new instance that points to a directory where the data loaded in lesson 02_loading_data is stored.

import teehr
from teehr.evaluation.utils import print_tree
from pathlib import Path

# Define the directory where the Evaluation will be created
test_eval_dir = Path(Path().home(), "temp", "02_loading_data")

# Create an Evaluation object and create the directory
ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 18:51:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Now that we have created a new evaluation that points to the dataset we created in 02_loading _data, lets take a look at the data, specifically the dataset directory. You can see that the three different data groups are stored in slightly different ways.

  • The domain tables (units, variables, configurations, attributes) are stored as *.csv files. While in this case the files happen to have the same name as the table, there is no requirement that they do.

  • The location tables (locations, location_attributes, location_crosswalks) are stored as parquet files without hive partitioning. The file names are managed by Spark.

  • The timeseries tables (primary_timeseries, secondary_timeseries, joined_timeseries) are stored as parquet files with hive partitioning. The file names are managed by Spark.

Note, if you don’t have tree installed and don’t want to install it, you can uncomment the comment lines to use a Python function roughly does the same thing.

print_tree(ev.dataset_dir, exclude_patterns=[".*", "_*"])
├── configurations
│   └── configurations.csv
├── primary_timeseries
│   └── configuration_name=usgs_observations
│       └── variable_name=streamflow_hourly_inst
│           └── part-00000-761b608e-66ec-4b1f-9d8b-34f13219a780.c000.snappy.parquet
├── locations
│   └── gages.parquet
├── joined_timeseries
│   └── configuration_name=test_short
│       └── variable_name=streamflow_hourly_inst
│           └── part-00000-fabf8596-f4ae-4de3-9b6f-00d380a16bc0.c000.snappy.parquet
├── units
│   └── units.csv
├── location_attributes
│   ├── part-00001-62801fd2-ef20-4a50-b988-ec458a42f2aa-c000.snappy.parquet
│   ├── part-00000-62801fd2-ef20-4a50-b988-ec458a42f2aa-c000.snappy.parquet
│   └── part-00002-62801fd2-ef20-4a50-b988-ec458a42f2aa-c000.snappy.parquet
├── location_crosswalks
│   └── part-00000-31c8ab56-462d-4bff-a941-5d80004fcec7-c000.snappy.parquet
├── secondary_timeseries
│   └── configuration_name=test_short
│       └── variable_name=streamflow_hourly_inst
│           └── part-00000-494f1895-8c64-4c21-b257-dc5144eaca98.c000.snappy.parquet
├── attributes
│   └── attributes.csv
└── variables
    └── variables.csv

Table Classes#

The TEEHR Evaluation class contains different sub-classes that are used to organize class methods into logical groups. One of these types of sub-classes is the “table” sub-classes which contain methods for interacting with the data tables. Each of the tables in the Evaluation dataset has a respective sub-class with the table name.

ev.units
ev.attributes
ev.variables
ev.configurations
ev.locations
ev.location_attributes
ev.location_crosswalks
ev.primary_timeseries
ev.secondary_timeseries
ev.joined_timeseries

Each of the table sub-classes then has methods to add and/or load new data as well as methods to query the table to get data out. These are documented in the API documentation. For now, because all the tables are relatively small, we will just use the to_pandas() method and then the head() method on the Pandas DataFrame to see an example of the data that is returned. In an actual evaluation setting, with lots of data in the TEEHR dataset, you would likely want to include a filter() method to reduce the amount of data you are querying and putting into memory.

ev.units.to_pandas().head()
name long_name
0 m^3/s Cubic Meters Per Second
1 ft^3/s Cubic Feet Per Second
2 km^2 Square Kilometers
3 mm/s Millimeters Per Second
ev.attributes.to_pandas().head()
name type description
0 2_year_flow continuous Two year flow rate in m^3/s
1 drainage_area continuous Drainage area in square kilometers
2 ecoregion categorical Ecoregion
3 year_2_discharge continuous 2-yr discharge in cubic meters per second
ev.variables.to_pandas().head()
name long_name
0 streamflow_hourly_inst Hourly Instantaneous Streamflow
1 streamflow_daily_mean Daily Mean Streamflow
2 rainfall_hourly_rate Hourly Rainfall Rate
ev.configurations.to_pandas().head()
name type description
0 nwm30_retrospective secondary NWM 3.0 Retrospective
1 nwm22_retrospective secondary NWM 2.2 Retrospective
2 nwm21_retrospective secondary NWM 2.1 Retrospective
3 nwm20_retrospective secondary NWM 2.0 Retrospective
4 usgs_observations primary USGS Observations
ev.locations.to_pandas().head()
id name geometry
0 gage-A Site A b'\x01\x01\x00\x00\x00\xc8f\xa9\x1d\x07.T\xc0\...
1 gage-B Site B b'\x01\x01\x00\x00\x00\x00\x06?-\xa3-T\xc0\xbb...
2 gage-C Bite C b"\x01\x01\x00\x00\x00\xb0Y\xebG'/T\xc0K\xa5\n...
ev.location_attributes.to_pandas().head()
location_id attribute_name value
0 gage-A year_2_discharge 500
1 gage-B year_2_discharge 200
2 gage-C year_2_discharge 300
3 gage-A ecoregion coastal_plain
4 gage-B ecoregion piedmont
ev.primary_timeseries.to_pandas().head()
reference_time value_time value unit_name location_id configuration_name variable_name
0 NaT 2022-01-01 00:00:00 0.1 m^3/s gage-A usgs_observations streamflow_hourly_inst
1 NaT 2022-01-01 01:00:00 1.1 m^3/s gage-A usgs_observations streamflow_hourly_inst
2 NaT 2022-01-01 02:00:00 0.1 m^3/s gage-A usgs_observations streamflow_hourly_inst
3 NaT 2022-01-01 03:00:00 0.2 m^3/s gage-A usgs_observations streamflow_hourly_inst
4 NaT 2022-01-01 04:00:00 0.3 m^3/s gage-A usgs_observations streamflow_hourly_inst
ev.location_crosswalks.to_pandas().head()
primary_location_id secondary_location_id
0 gage-A fcst-1
1 gage-B fcst-2
2 gage-C fcst-3
ev.secondary_timeseries.to_pandas().head()
reference_time value_time value unit_name location_id configuration_name variable_name
0 2022-01-01 2022-01-01 00:00:00 0.0 m^3/s fcst-1 test_short streamflow_hourly_inst
1 2022-01-01 2022-01-01 01:00:00 0.1 m^3/s fcst-1 test_short streamflow_hourly_inst
2 2022-01-01 2022-01-01 02:00:00 0.2 m^3/s fcst-1 test_short streamflow_hourly_inst
3 2022-01-01 2022-01-01 03:00:00 0.3 m^3/s fcst-1 test_short streamflow_hourly_inst
4 2022-01-01 2022-01-01 04:00:00 0.4 m^3/s fcst-1 test_short streamflow_hourly_inst

Querying#

Above, we just used the to_pandas() method on each table in the dataset to see an example of the data that is in each table. The underlying query engine for TEEHR is PySpark. As a result, each of the table sub-classes can return data as either a Spark DataFrame (using the to_sdf() method) or as a Pandas DataFrame (using the to_pandas() method). The location data tables have an additional method that returns a GeoPandas DataFrame (using the to_geopandas() method) where the geometry bytes column has been converted to a proper WKT geometry column.

Note: PySpark itself is “lazy loaded” meaning that it does not actually run the query until the data is needed for display, plotting, etc. Therefore, if you just use the to_sdf() method, you do not get the data but rather a lazy Spark DataFrame that can be used with subsequent Spark operations that will all be evaluated when the results are requested. Here we show how to get the Spark DataFrame and show the data but there are many other ways that the lazy Spark DataFrame can be used in subsequent operations that are beyond the scope of this document.

# Query the locations and return as a lazy Spark DataFrame.
ev.locations.to_sdf()
DataFrame[id: string, name: string, geometry: binary]
# Query the locations and return as a Spark DataFrame but tell Spark to show the data.
ev.locations.to_sdf().show()
+------+------+--------------------+
|    id|  name|            geometry|
+------+------+--------------------+
|gage-A|Site A|[01 01 00 00 00 C...|
|gage-B|Site B|[01 01 00 00 00 0...|
|gage-C|Bite C|[01 01 00 00 00 B...|
+------+------+--------------------+
# Query the locations and return as a Pandas DataFrame.
# Note that the geometry column is shown as a byte string.
ev.locations.to_pandas()
id name geometry
0 gage-A Site A b'\x01\x01\x00\x00\x00\xc8f\xa9\x1d\x07.T\xc0\...
1 gage-B Site B b'\x01\x01\x00\x00\x00\x00\x06?-\xa3-T\xc0\xbb...
2 gage-C Bite C b"\x01\x01\x00\x00\x00\xb0Y\xebG'/T\xc0K\xa5\n...
# Query the locations and return as a GeoPandas DataFrame.
# Note that the geometry column is now a proper WKT geometry column.
ev.locations.to_geopandas()
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
1 gage-B Site B POINT (-80.71308 35.15839)
2 gage-C Bite C POINT (-80.73677 35.18693)

One very quick example of how the Spark DataFrame’s lazy loading can be beneficial, would be to get the number of rows in a query result. If you did len(ev.primary_timeseries.to_pandas()), first the entire data frame would have to be loaded in memory as a Pandas DataFrame and then the length calculated. On the otherhand, if you were to ev.primary_timeseries.to_sdf().count() the Spark engine would calculate the number of rows without loading the entire dataset into memory first. For larger datsets this could be very important.

display(
    len(ev.primary_timeseries.to_pandas())
)
display(
    ev.primary_timeseries.to_sdf().count()
)
78
78

Filtering and Ordering#

As noted above, because the tables are a lazy loaded Spark DataFrames, we can filter and order the data before returning it as a Pandas or GeoPandas DataFrame. The filter methods take either a raw SQL string, a filter dictionary or a FilterObject, Operator and field enumeration. Using an FilterObject, Operator and field enumeration is probably not a common pattern for most users, but it is used internally to validate filter arguments and is available to users if they would like to use it.

# Filter using a raw SQL string
ev.locations.filter("id = 'gage-A'").to_geopandas()
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
# Filter using a dictionary
ev.locations.filter({
    "column": "id",
    "operator": "=",
    "value": "gage-A"
}).to_geopandas()
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
# Import the LocationFilter and Operators classes
from teehr import LocationFilter, Operators

# Get the field enumeration
fields = ev.locations.field_enum()

# Filter using the LocationFilter class
lf = LocationFilter(
    column=fields.id,
    operator=Operators.eq,
    value="gage-A"
)
ev.locations.filter(lf).to_geopandas()
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
ev.spark.stop()

This same approach can be used to query the other tables in the evaluation dataset. There are also other methods that we did not explore and users are encouraged to checkout the TEEHR API documentation as well as the PySpark documentation for a more in-depth understanding of what happens in the background.