Clone an Example from S3#

Overview#

In this lesson we will clone a small example TEEHR Evaluation from S3 and run through some simple example metrics calculations, demonstrating filter, grouping and chaining of query methods.

Create a new Evaluation#

First we will import TEEHR along with some other required libraries for this example. Then we create a new instance of the Evaluation that points to a directory where the evaluation data will be stored.

import teehr
from pathlib import Path
import shutil

# Tell Bokeh to output plots in the notebook
from bokeh.io import output_notebook
output_notebook()
Loading BokehJS ...
# Define the directory where the Evaluation will be created
test_eval_dir = Path(Path().home(), "temp", "04_setup_real_example")
shutil.rmtree(test_eval_dir, ignore_errors=True)

# Create an Evaluation object and create the directory
ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)
Hide code cell output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 18:52:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Clone Evaluation Data form S3#

As mentioned above, for this exercise we will be cloning a complete Evaluation dataset from the TEEHR S3 bucket. First we will list the available Evaluations and then we will clone the e0_2_location_example evaluation which is a small example Evaluation that only contains 2 gages.

# List the evaluations in the S3 bucket
ev.list_s3_evaluations()
name description url
0 e0_2_location_example Example evaluation datsets with 2 USGS gages s3a://ciroh-rti-public-data/teehr-data-warehou...
1 e1_camels_daily_streamflow Daily average streamflow at ther Camels basins s3a://ciroh-rti-public-data/teehr-data-warehou...
2 e2_camels_hourly_streamflow Hourly instantaneous streamflow at ther Camels... s3a://ciroh-rti-public-data/teehr-data-warehou...
3 e3_usgs_hourly_streamflow Hourly instantaneous streamflow at USGS CONUS ... s3a://ciroh-rti-public-data/teehr-data-warehou...
# Clone the e0_2_location_example evaluation from the S3 bucket
ev.clone_from_s3("e0_2_location_example")
Hide code cell output
24/11/19 18:52:18 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://ciroh-rti-public-data/teehr-data-warehouse/v0_4_evaluations/e0_2_location_example/dataset/units/.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 26 more
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[4], line 2
      1 # Clone the e0_2_location_example evaluation from the S3 bucket
----> 2 ev.clone_from_s3("e0_2_location_example")

File ~/work/teehr/teehr/src/teehr/evaluation/evaluation.py:270, in Evaluation.clone_from_s3(self, evaluation_name, primary_location_ids, start_date, end_date)
    224 def clone_from_s3(
    225     self,
    226     evaluation_name: str,
   (...)
    229     end_date: Union[str, datetime] = None,
    230 ):
    231     """Fetch the study data from S3.
    232 
    233     Copies the study from s3 to the local directory, with the option
   (...)
    268 
    269     """
--> 270     return clone_from_s3(
    271         self,
    272         evaluation_name,
    273         primary_location_ids,
    274         start_date,
    275         end_date
    276     )

File ~/work/teehr/teehr/src/teehr/loading/s3/clone_from_s3.py:217, in clone_from_s3(ev, evaluation_name, primary_location_ids, start_date, end_date)
    209 Path(local_path).mkdir()
    211 logger.debug(f"Cloning {name} from {s3_dataset_path}/{dir_name}/ to {local_path}")
    213 sdf_in = (
    214     ev.spark
    215     .read
    216     .format(format)
--> 217     .load(f"{s3_dataset_path}/{dir_name}/")
    218 )
    220 sdf_in = subset_the_table(
    221     ev=ev,
    222     s3_dataset_path=s3_dataset_path,
   (...)
    227     end_date=end_date
    228 )
    230 if table["partitions"]:

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/pyspark/sql/readwriter.py:307, in DataFrameReader.load(self, path, format, schema, **options)
    305 self.options(**options)
    306 if isinstance(path, str):
--> 307     return self._df(self._jreader.load(path))
    308 elif path is not None:
    309     if type(path) != list:

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o35.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 29 more

Now that we have cloned all the data for this evaluation from the TEEHR S3 bucket, lets query the locations table as a GeoPandas GeoDataFrame and then plot the gages on a map using the TEEHR plotting.

locations_gdf = ev.locations.to_geopandas()
locations_gdf.teehr.location_map()

Lets also query the primary_timeseries and plot the timeseries data using the df.teehr.timeseries_plot() method.

pt_df = ev. primary_timeseries.to_pandas()
pt_df.head()
pt_df.teehr.timeseries_plot()

And the location_crosswalks table.

lc_df = ev.location_crosswalks.to_pandas()
lc_df.head()

And the secondary_timeseries table.

st_df = ev.secondary_timeseries.to_pandas()
st_df.head()
st_df.teehr.timeseries_plot()

And lastly, the joined_timeseries table.

jt_df = ev.joined_timeseries.to_pandas()
jt_df.head()

Metrics#

Now that we have confirmed that we have all the data tables and the joined_timeseries table, we can move on to analyzing the data. The user is encouraged to check out the documentation pages relating to filtering and grouping in the context of generating metrics. The short explanation is that filters can be used to select what values are used when calculating metrics, while the group_by determines how the values are grouped into populations before calculating metrics.

The most basic way to evaluate simulation performance is to group_by configuration_name and primary_location_id, and generate some basic metrics. In this case it will be Nash-Sutcliffe Efficiency, Kling-Gupta Efficiency and Relative Bias, calculated at each location for each configuration. As we saw there are 2 locations and 1 configuration, so the total number of rows that are output is just 2. If there were more locations or more configurations, there would be more rows in the output for this query. TEEHR contains many more metrics that can be calculated by simply including them in the list of include_metrics, and there are also many other ways to look at performance besides the basic metrics.

ev.metrics.query(
    group_by=["configuration_name", "primary_location_id"],
    include_metrics=[
        teehr.Metrics.NashSutcliffeEfficiency(),
        teehr.Metrics.KlingGuptaEfficiency(),
        teehr.Metrics.RelativeBias()
    ]
).to_pandas()

Now to demonstrate how filters work, if we add a filter to only select values where the primary_location_id is usgs-14138800. Accordingly, it will only include rows from the join_timeseries table where primary_location_id is usgs-14138800 in the metrics calculations, and since we are grouping by primary_location_id, that means we can expect one row in the output. And that is what we see below.

(
    ev.metrics
    .query(
        group_by=["configuration_name", "primary_location_id"],
        filters=[
            {
                "column": "primary_location_id",
                "operator": "=",
                "value": "usgs-14138800"
            }],
        include_metrics=[
            teehr.Metrics.NashSutcliffeEfficiency(),
            teehr.Metrics.KlingGuptaEfficiency(),
            teehr.Metrics.RelativeBias()
        ]
    )
    .to_pandas()
)

As another example, because the joined_timeseries table contains a year column which was added as a user defined field, we can also group by year. In this case we will get the metrics calculated for each configuration_name, primary_location_id, and year.

(
    ev.metrics
    .query(
        group_by=["configuration_name", "primary_location_id", "year"],
        filters=[
            {
                "column": "primary_location_id",
                "operator": "=",
                "value": "usgs-14138800"
            }],
        include_metrics=[
            teehr.Metrics.NashSutcliffeEfficiency(),
            teehr.Metrics.KlingGuptaEfficiency(),
            teehr.Metrics.RelativeBias()
        ]
    )
    .to_pandas()
)

There are many ways that TEEHR can be used to “slice and dice” the data in the TEEHR dataset. One last example here before wrapping up this lesson. Lets say we wanted the “annual peak relative bias”, so that is the relative bias of the annual peak values. Well, TEEHR can do this too by chaining the query methods together and overriding the input_field_names and the output_field_name as shown below. We will do this step by step to understand it. First run the following query where the second query is commented out then in the next cell run it with the second query uncommented. As you can see first we calculate the peak primary value (max_primary_value) and peak secondary value (max_secondary_value) for each year, then we calculate the relative bias across the yearly peaks (annual_max_relative_bias).

(
    ev.metrics
    .query(
        group_by=["configuration_name", "primary_location_id", "year"],
        filters=[
            {
                "column": "primary_location_id",
                "operator": "=",
                "value": "usgs-14138800"
            }],
        include_metrics=[
            teehr.Metrics.Maximum(
                input_field_names=["primary_value"],
                output_field_name="max_primary_value"
            ),
            teehr.Metrics.Maximum(
                input_field_names=["secondary_value"],
                output_field_name="max_secondary_value"
            )
        ]
    )
    # .query(
    #     group_by=["configuration_name", "primary_location_id"],
    #     include_metrics=[
    #         teehr.Metrics.RelativeBias(
    #             input_field_names=["max_primary_value", "max_secondary_value"],
    #             output_field_name="monthly_max_relative_bias"
    #         )
    #     ]
    # )
    .to_pandas()
)
(
    ev.metrics
    .query(
        group_by=["configuration_name", "primary_location_id", "year"],
        filters=[
            {
                "column": "primary_location_id",
                "operator": "=",
                "value": "usgs-14138800"
            }],
        include_metrics=[
            teehr.Metrics.Maximum(
                input_field_names=["primary_value"],
                output_field_name="max_primary_value"
            ),
            teehr.Metrics.Maximum(
                input_field_names=["secondary_value"],
                output_field_name="max_secondary_value"
            )
        ]
    )
    .query(
        group_by=["configuration_name", "primary_location_id"],
        include_metrics=[
            teehr.Metrics.RelativeBias(
                input_field_names=["max_primary_value", "max_secondary_value"],
                output_field_name="annual_max_relative_bias"
            )
        ]
    )
    .to_pandas()
)
ev.spark.stop()