Grouping and Filtering#

Note: The images shown below are slightly out of date. We are in the process of updating them.

Once the data has been joined into a single table, we can start to group and filter the data based on the table attributes, and calculate metrics for specific subsets of the data. This is the explorative power of TEEHR, which allows us to better understand model performance. For example, if the joined table contained several model simulations (“configurations”) we could group the configuration_name field to calculate performance metrics for each model configuration.

We could then include filters to further narrow the population subset such as only considering first order stream locations or locations below a certain mean slope value. This allows us to gain more insight into the model performance through specific quantitative analysis.

The grouping and filtering capabilities in TEEHR provide the ability to explore models across different subsets of the data, allowing us to better understand where and why the model performs well or poorly.

We’ll look at an example to help illustrate the grouping and filtering concepts.

https://github.com/RTIInternational/teehr/blob/main/docs/images/tutorials/grouping_filtering/grouping_example_table.png?raw=true

Consider this joined timeseries table containing:

  • 2 USGS locations

  • 3 Model configurations

  • 4 Daily timesteps spanning two months

  • 1 Location attribute (q95_cms)

  • 1 User-defined attribute (month)

When calculating metrics in TEEHR, we can use the data in this table to calculate metrics over specific subsets or populations of the data. For example, we could calculate the relative bias for each model configuration for each month.

Grouping#

Let’s use this table of joined timeseries values to demonstrate how grouping selected fields affects the results.

First, we’ll calculate the relative bias for each model configuration at each location:

https://github.com/RTIInternational/teehr/blob/main/docs/images/tutorials/grouping_filtering/grouping_example_1.png?raw=true

We can demonstrate how this calculation is performed in TEEHR using sample data. First, we’ll set up a local directory that will contain our Evaluation, then we’ll clone a subset of an existing Evaluation from s3 storage.

from pathlib import Path
import shutil

import teehr

# Define the directory where the Evaluation will be created
test_eval_dir = Path(Path().home(), "temp", "grouping_tutorial")
shutil.rmtree(test_eval_dir, ignore_errors=True)

# Create an Evaluation object and create the directory
ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)
Hide code cell output
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/19 18:52:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
# List the evaluations in the S3 bucket
ev.list_s3_evaluations()
name description url
0 e0_2_location_example Example evaluation datsets with 2 USGS gages s3a://ciroh-rti-public-data/teehr-data-warehou...
1 e1_camels_daily_streamflow Daily average streamflow at ther Camels basins s3a://ciroh-rti-public-data/teehr-data-warehou...
2 e2_camels_hourly_streamflow Hourly instantaneous streamflow at ther Camels... s3a://ciroh-rti-public-data/teehr-data-warehou...
3 e3_usgs_hourly_streamflow Hourly instantaneous streamflow at USGS CONUS ... s3a://ciroh-rti-public-data/teehr-data-warehou...
ev.clone_from_s3(
    evaluation_name="e1_camels_daily_streamflow",
    primary_location_ids=["usgs-01013500", "usgs-01022500"],
    start_date="1990-10-30 00:00",
    end_date="1990-11-02 23:00"
)
Hide code cell output
24/11/19 18:52:28 WARN FileStreamSink: Assume no metadata directory. Error while looking for metadata directory in the path: s3a://ciroh-rti-public-data/teehr-data-warehouse/v0_4_evaluations/e1_camels_daily_streamflow/dataset/units/.
java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.streaming.FileStreamSink$.hasMetadata(FileStreamSink.scala:53)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:366)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 26 more
---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
Cell In[3], line 1
----> 1 ev.clone_from_s3(
      2     evaluation_name="e1_camels_daily_streamflow",
      3     primary_location_ids=["usgs-01013500", "usgs-01022500"],
      4     start_date="1990-10-30 00:00",
      5     end_date="1990-11-02 23:00"
      6 )

File ~/work/teehr/teehr/src/teehr/evaluation/evaluation.py:270, in Evaluation.clone_from_s3(self, evaluation_name, primary_location_ids, start_date, end_date)
    224 def clone_from_s3(
    225     self,
    226     evaluation_name: str,
   (...)
    229     end_date: Union[str, datetime] = None,
    230 ):
    231     """Fetch the study data from S3.
    232 
    233     Copies the study from s3 to the local directory, with the option
   (...)
    268 
    269     """
--> 270     return clone_from_s3(
    271         self,
    272         evaluation_name,
    273         primary_location_ids,
    274         start_date,
    275         end_date
    276     )

File ~/work/teehr/teehr/src/teehr/loading/s3/clone_from_s3.py:217, in clone_from_s3(ev, evaluation_name, primary_location_ids, start_date, end_date)
    209 Path(local_path).mkdir()
    211 logger.debug(f"Cloning {name} from {s3_dataset_path}/{dir_name}/ to {local_path}")
    213 sdf_in = (
    214     ev.spark
    215     .read
    216     .format(format)
--> 217     .load(f"{s3_dataset_path}/{dir_name}/")
    218 )
    220 sdf_in = subset_the_table(
    221     ev=ev,
    222     s3_dataset_path=s3_dataset_path,
   (...)
    227     end_date=end_date
    228 )
    230 if table["partitions"]:

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/pyspark/sql/readwriter.py:307, in DataFrameReader.load(self, path, format, schema, **options)
    305 self.options(**options)
    306 if isinstance(path, str):
--> 307     return self._df(self._jreader.load(path))
    308 elif path is not None:
    309     if type(path) != list:

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:179, in capture_sql_exception.<locals>.deco(*a, **kw)
    177 def deco(*a: Any, **kw: Any) -> Any:
    178     try:
--> 179         return f(*a, **kw)
    180     except Py4JJavaError as e:
    181         converted = convert_exception(e.java_exception)

File /opt/hostedtoolcache/Python/3.10.15/x64/lib/python3.10/site-packages/py4j/protocol.py:326, in get_return_value(answer, gateway_client, target_id, name)
    324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325 if answer[1] == REFERENCE_TYPE:
--> 326     raise Py4JJavaError(
    327         "An error occurred while calling {0}{1}{2}.\n".
    328         format(target_id, ".", name), value)
    329 else:
    330     raise Py4JError(
    331         "An error occurred while calling {0}{1}{2}. Trace:\n{3}\n".
    332         format(target_id, ".", name, value))

Py4JJavaError: An error occurred while calling o35.load.
: java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2688)
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:3431)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:3466)
	at org.apache.hadoop.fs.FileSystem.access$300(FileSystem.java:174)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:3574)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:3521)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:540)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:365)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$checkAndGlobPathIfNecessary$1(DataSource.scala:724)
	at scala.collection.immutable.List.map(List.scala:293)
	at org.apache.spark.sql.execution.datasources.DataSource$.checkAndGlobPathIfNecessary(DataSource.scala:722)
	at org.apache.spark.sql.execution.datasources.DataSource.checkAndGlobPathIfNecessary(DataSource.scala:551)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:404)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:186)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.s3a.S3AFileSystem not found
	at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2592)
	at org.apache.hadoop.conf.Configuration.getClass(Configuration.java:2686)
	... 29 more

Here we calculate relative bias, grouping by primary_location_id and configuration_name:

from teehr import Metrics as m

metrics_df = ev.metrics.query(
    group_by=["primary_location_id", "configuration_name"],
    include_metrics=[
        m.RelativeBias(),
    ]
).to_pandas()
metrics_df

Note that if you wanted to include a field in the query result, it must be included in the group_by list even if it’s not necessary for the grouping operation.

For example, if we wanted to include q95 in the query result, we would need to include it in the group_by list:

https://github.com/RTIInternational/teehr/blob/main/docs/images/tutorials/grouping_filtering/grouping_example_2.png?raw=true
# Adding q95_cms to the group_by list to include it in the results.
metrics_df = ev.metrics.query(
    group_by=["primary_location_id", "configuration_name", "q95"],
    include_metrics=[
        m.RelativeBias(),
    ]
).to_pandas()
metrics_df

Filtering#

Next, we’ll add filtering to further narrow the population for our metric calculations. Let’s say we only want to consider NWM v3.0 and Marrmot model configurations:

https://github.com/RTIInternational/teehr/blob/main/docs/images/tutorials/grouping_filtering/grouping_example_3.png?raw=true

We need to specify a filter in the query method to only include the desired model configurations:

# Adding a filter to further limit the population for metrics calculations.
metrics_df = ev.metrics.query(
    group_by=["primary_location_id", "configuration_name", "q95"],
    include_metrics=[
        m.RelativeBias(),
    ],
    filters = [
        {
            "column": "configuration_name",
            "operator": "in",
            "value": ["nwm30_retro", "marrmot_37_hbv_obj1"]
        }
    ]
).to_pandas()
metrics_df

Summary#

Grouping and filtering are powerful tools in TEEHR that allow us to explore the data in more detail and calculate metrics for specific subsets of the data.

See the User Guide for more in-depth examples using the code base.

ev.spark.stop()