Loading Local Data#

Overview#

In the Introduction to the Schema lesson we explored what data makes up an Evaluation dataset and how it is stored on disk. In this lesson we are going to explore how your data can be added to a new or existing Evaluation dataset.

Similar to previous lesson, we will start by importing the required packages and creating a Evaluation class instance that references a directory where the data will be stored, and then clone the Evaluation template as a starting point.

import teehr
import teehr.example_data.v0_3_test_study as test_study_data
from pathlib import Path
import shutil
import geopandas as gpd
import pandas as pd

# Tell Bokeh to output plots in the notebook
from bokeh.io import output_notebook
output_notebook()
Loading BokehJS ...
# Define the directory where the Evaluation will be created
test_eval_dir = Path(Path().home(), "temp", "02_loading_data")
shutil.rmtree(test_eval_dir, ignore_errors=True)

# Create an Evaluation object and create the directory
ev = teehr.Evaluation(dir_path=test_eval_dir, create_dir=True)

# Clone the template
ev.clone_template()
Hide code cell output

For this example, we will use the data from the example dataset which is stored in the tests/data folder of the teehr repository. We will copy the data to the Evaluation directory, examine the contents and then load into the TEEHR Evaluation dataset. Note these data are for demonstration purposes only. We have given the data made-up location IDs and fake timeseries values for demonstration purposes. We will work with real data in future exercises.

Locations#

First lets download some location data. We will download the gages.geojson file which is a GeoJSON file that contains 3 fictional gages.

In TEEHR, locations are the sites corresponding to the observed/truth timeseries data, which is called the “primary_timeseries”.

location_data_path = Path(test_eval_dir, "gages.geojson")
test_study_data.fetch_file("gages.geojson", location_data_path)

Now lets open the gages.geojson file with GeoPandas and take a look at the contents.

location_data_path = Path(test_eval_dir, "gages.geojson")
gdf = gpd.read_file(location_data_path)
gdf
id name geometry
0 gage-A Site A POINT (-80.71918 35.17611)
1 gage-B Site B POINT (-80.71308 35.15839)
2 gage-C Bite C POINT (-80.73677 35.18693)

You can see that the file contains 3 gages that are each represented by a point. Next we will load the location data into the TEEHR dataset using methods on the locations table sub-class. Note that this file has the id, name and geometry fields required by TEEHR. If it didn’t, like all the table sub-class loading methods, the load_spatial method can take a field_mapping dictionary that can be used to specify which fields should be mapped to which required field name. We will look more closely at field_mapping when loading some other datasets.

Location ID Prefixes#

Note that the location IDs are required to be unique, so TEEHR uses a location ID prefix convention to help ensure uniqueness.

The location ID prefix is just a string in front of the ID separated by a hyphen, and represents the model configuration. For example, a USGS gage ID could get the prefix “usgs” and look something like “usgs- 03061000”.

TEEHR provides the ability to specify (or replace) the location ID prefix when loading data.

Let’s load our test locations, replacing the “gage” prefix in the source dataset with “usgs”:

ev.locations.load_spatial(location_data_path, location_id_prefix="usgs")
Hide code cell output

Now that the location data has been loaded into the TEEHR dataset, it can be queried with the ev.locations class methods, but more importantly, now we can load timeseries that references the id in the locations data. The id in the locations table is the primary location ID and is referenced in the primary_timeseries table as location_id.

First lets query the location data to verify it has been loaded, then look at a map to verify it is in the correct locations, then we will move on loading some timeseries data.

locations_gdf = ev.locations.to_geopandas()
locations_gdf
id name geometry
0 usgs-A Site A POINT (-80.71918 35.17611)
1 usgs-B Site B POINT (-80.71308 35.15839)
2 usgs-C Bite C POINT (-80.73677 35.18693)

This table of locations came from the TEEHR dataset and matches what we saw in the source file, so we know it was loaded successfully. Now lets look at a plot using the built-in teehr.locations_map() method.

locations_gdf.teehr.locations_map()

Primary Timeseries#

Ok, now that we have loaded the location data, we can load some timeseries to the primary_timeseries table where the location_id field in the primary_timeseries table references the id field in the locations table. The primary timeseries represents the timeseries that simulations will be compared to (i.e., the truth).

First, let’s download a test timeseries from the TEEHR repository as a *.csv file, open and examine the contents with Pandas, and load into the TEEHR dataset.

primary_timeseries_path = Path(test_eval_dir, "test_short_obs.csv")
test_study_data.fetch_file("test_short_obs.csv", primary_timeseries_path)
primary_timeseries_df = pd.read_csv(primary_timeseries_path)
primary_timeseries_df.head()
reference_time value_time value variable_name measurement_unit configuration location_id
0 NaN 2022-01-01T00:00:00 0.1 streamflow m^3/s test_obs gage-A
1 NaN 2022-01-01T01:00:00 1.1 streamflow m^3/s test_obs gage-A
2 NaN 2022-01-01T02:00:00 0.1 streamflow m^3/s test_obs gage-A
3 NaN 2022-01-01T03:00:00 0.2 streamflow m^3/s test_obs gage-A
4 NaN 2022-01-01T04:00:00 0.3 streamflow m^3/s test_obs gage-A

There a few things we need to look at and consider before loading the data into the TEEHR dataset. First, the field names. You can see above that while the schema of this dataset is close to the TEEHR schema, it is not exactly correct, specifically configuration should be configuration_name and measurement_unit should be unit_name.

To address this we can use the field_mapping argument to map the existing field names to the required field names.

The next thing that needs to be dealt with are the values in the fields that reference values in another table, for example a domain table or the location table. Lets start with unit_name. The unit_name values need to reference a value from the name field in the units table. Next lets look at what values are in the units table.

# Query the units table
ev.units.to_pandas()
name long_name
0 m^3/s Cubic Meters Per Second
1 ft^3/s Cubic Feet Per Second
2 km^2 Square Kilometers
3 mm/s Millimeters Per Second

Here we can see that m^3/s is in the units table, so we are all set with values in the measurement_units field. Next lets check the variables table.

ev.variables.to_pandas()
name long_name
0 streamflow_hourly_inst Hourly Instantaneous Streamflow
1 streamflow_daily_mean Daily Mean Streamflow
2 rainfall_hourly_rate Hourly Rainfall Rate

Here we see that streamflow is not in the name field of the variables table. There are two ways to remedy this:

  1. we could add the value streamflow to the variables table, or

  2. we can use the constant_field_values argument to set the variable_name to an allowed values.

In this case we will go with the latter and set the variable_name to streamflow_hourly_inst. Next we need to address the configuration_name field. First lets see what values are in the configurations table. By default the configurations table is empty so we need to add an entry for each timeseries data set.

ev.configurations.to_pandas()
name type description

Here we add the configuration_name to the configuration domain table. This represents the configuration name for the primary_timeseries data when loaded into the evaluation, which in this case will be usgs_observations instead of test_obs.

ev.configurations.add(
    teehr.Configuration(
        name="usgs_observations",
        type="primary",
        description="Test Observed Configuration",
    )
)

Now we can load the data into the TEEHR evaluation. We use the field_mapping argument to map the field names in our .csv file to the TEEHR schema names, and we use the constant_field_values argument to set the values to a constant.

Since the IDs in the locations table were given the prefix “usgs”, we’ll need to do the same here.

# Load the timeseries data and map over the fields and set constants
ev.primary_timeseries.load_csv(
    in_path=primary_timeseries_path,
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "variable_name": "streamflow_hourly_inst",
        "configuration_name": "usgs_observations"
    },
    location_id_prefix="usgs"
)

Now we can query for a single timeseries and make sure the data was loaded.

primary_timeseries_df = ev.primary_timeseries.filter(
    {
        "column": "location_id",
        "operator": "=",
        "value": "usgs-A"
    }
).to_pandas()
primary_timeseries_df.head()
reference_time value_time value unit_name location_id configuration_name variable_name
0 NaT 2022-01-01 14:00:00 4.0 m^3/s usgs-A usgs_observations streamflow_hourly_inst
1 NaT 2022-01-01 01:00:00 1.1 m^3/s usgs-A usgs_observations streamflow_hourly_inst
2 NaT 2022-01-01 00:00:00 0.1 m^3/s usgs-A usgs_observations streamflow_hourly_inst
3 NaT 2022-01-01 21:00:00 0.1 m^3/s usgs-A usgs_observations streamflow_hourly_inst
4 NaT 2022-01-01 12:00:00 2.0 m^3/s usgs-A usgs_observations streamflow_hourly_inst

And create a plot using the df.teehr.timeseries_plot() method.

primary_timeseries_df.teehr.timeseries_plot()

Secondary Timeseries#

Next we will load some simulated data that we want to compare to the observed timeseries (primary_timeseries). The secondary_timeseries table is where the “simulated” timeseries data are stored. First we will download some test secondary timeseries from the TEEHR repository.

secondary_timeseries_path = Path(test_eval_dir, "test_short_fcast.parquet")
test_study_data.fetch_file("test_short_fcast.parquet", secondary_timeseries_path)

Similar to what we did with the primary timeseries, we will open the timeseries with Panadas to see what the data looks like, then we will load it into the TEEHR dataset.

secondary_timeseries_df = pd.read_parquet(secondary_timeseries_path)
secondary_timeseries_df.head()
reference_time value_time value variable_name measurement_unit configuration location_id
0 2022-01-01 2022-01-01 00:00:00 0.0 streamflow m^3/s test_short fcst-1
1 2022-01-01 2022-01-01 01:00:00 0.1 streamflow m^3/s test_short fcst-1
2 2022-01-01 2022-01-01 02:00:00 0.2 streamflow m^3/s test_short fcst-1
3 2022-01-01 2022-01-01 03:00:00 0.3 streamflow m^3/s test_short fcst-1
4 2022-01-01 2022-01-01 04:00:00 0.4 streamflow m^3/s test_short fcst-1

When loading secondary_timeseries, the same considerations regarding field names and field values apply as those for primary_timeseries. Similar to loading the primary_timeseries, we will add the configuration name that represents the secondary timeseries to the configurations domain table.

In this case we’ll use test_short as the secondary timeseries configuration name.

ev.configurations.add(
    teehr.Configuration(
        name="test_short",
        type="secondary",
        description="Test Forecast Configuration",
    )
)

Location Crosswalks#

There is one other consideration before we can load the secondary_timeseries. There is no expectation in the TEEHR schema that the location_id used of the primary_timeseries matches the location_id in the secondary_timeseries. Therefore, it is necessary to load location crosswalk data to the location_crosswalks table so that TEEHR knows which primary_location_id matches which secondary_location_id. For example, in our test dataset, the observation data at location_id “gage-A” relates to forecasts with a location_id “fcst-1”.

We already have a file with this information, so like we did with the other test datasets, we will download the data from the TEEHR repository, open it with Pandas to examine the contents and load into the TEEHR dataset.

crosswalk_path = Path(test_eval_dir, "crosswalk.csv")
test_study_data.fetch_file("crosswalk.csv", crosswalk_path)
crosswalk_df = pd.read_csv(crosswalk_path)
crosswalk_df
primary_location_id secondary_location_id
0 gage-A fcst-1
1 gage-B fcst-2
2 gage-C fcst-3

In this case we can see that the crosswalk table already has the field names we need (primary_location_id and secondary_location_id) so we don’t need to use the field_mapping argument to pass the loading function a field mapping, and we can load it as-is.

Here we’ll specify the location ID prefix for the primary IDs. If needed we could also update or provide a prefix for the secondary ID using the secondary_location_id_prefix argument.

# Load the crosswalk data to the crosswalks table
ev.location_crosswalks.load_csv(crosswalk_path, primary_location_id_prefix="usgs")

Now that we have loaded the location_crosswalks data, we can load the secondary_timeseries. Note, unlike when we loaded the primary_timeseries, in this case we did not have to set a constant field value for configuration_name because we added the configuration to the configurations table.

# Load the secondary timeseries data and map over the fields and set constants
ev.secondary_timeseries.load_parquet(
    in_path=secondary_timeseries_path,
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "variable_name": "streamflow_hourly_inst"
    }
)

And to verify that it loaded, we can run a query to see the data.

# Query the secondary timeseries data for a single forecast.
secondary_timeseries_df = (
    ev.secondary_timeseries
    .filter(
        {
            "column": "reference_time",
            "operator": "=",
            "value": "2022-01-01"
        }
    )
).to_pandas()
secondary_timeseries_df.head()
value_time value unit_name location_id member configuration_name variable_name reference_time
0 2022-01-01 04:00:00 10.580000 m^3/s fcst-2 None test_short streamflow_hourly_inst 2022-01-01
1 2022-01-01 01:00:00 101.849998 m^3/s fcst-3 None test_short streamflow_hourly_inst 2022-01-01
2 2022-01-01 06:00:00 0.600000 m^3/s fcst-1 None test_short streamflow_hourly_inst 2022-01-01
3 2022-01-01 17:00:00 11.730000 m^3/s fcst-2 None test_short streamflow_hourly_inst 2022-01-01
4 2022-01-01 21:00:00 40.860001 m^3/s fcst-3 None test_short streamflow_hourly_inst 2022-01-01
secondary_timeseries_df.teehr.timeseries_plot()

Appending, Upserting, and Overwriting data#

Note that the loading methods allow you to add new data to an existing table (append, the default) or replace existing data with new values while adding additional data (upsert). These are available through the write_mode argument. Additionally, when write_mode="overwrite", any existing partitions receiving new data will be overwritten.

ex.

ev.primary_timeseries.load_parquet(
    in_path="<path-to-additional-data.parquet>",
    field_mapping={
        "reference_time": "reference_time",
        "value_time": "value_time",
        "configuration": "configuration_name",
        "measurement_unit": "unit_name",
        "variable_name": "variable_name",
        "value": "value",
        "location_id": "location_id"
    },
    constant_field_values={
        "unit_name": "m^3/s",
        "variable_name": "streamflow_hourly_inst",
        "configuration_name": "usgs_observations"
    },
    write_mode="append"
)

Adding Location Attributes#

At this point we have added data to the following TEEHR data tables:

  • locations

  • configurations

  • location_crosswalks

  • primary_timeseries

  • secondary_timeseries

This is the bare minimum needed to conduct an evaluation, but we are going to add one more type of data, location_attributes. Location attributes are data that provide some type of extra information for the primary locations. These are data are analogous to the columns in a geospatial data file “attribute table” but are stored in a separate table to keep the schema consistent across evaluations (i.e., we don’t add new columns for each new attribute)

attr_2yr_discharge_path = Path(test_eval_dir, "test_attr_2yr_discharge.csv")
test_study_data.fetch_file("test_attr_2yr_discharge.csv", attr_2yr_discharge_path)

attr_drainage_area_km2_path = Path(test_eval_dir, "test_attr_drainage_area_km2.csv")
test_study_data.fetch_file("test_attr_drainage_area_km2.csv", attr_drainage_area_km2_path)

attr_ecoregion_path = Path(test_eval_dir, "test_attr_ecoregion.csv")
test_study_data.fetch_file("test_attr_ecoregion.csv", attr_ecoregion_path)
Hide code cell output
---------------------------------------------------------------------------
RemoteDisconnected                        Traceback (most recent call last)
File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connectionpool.py:787, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    786 # Make the request on the HTTPConnection object
--> 787 response = self._make_request(
    788     conn,
    789     method,
    790     url,
    791     timeout=timeout_obj,
    792     body=body,
    793     headers=headers,
    794     chunked=chunked,
    795     retries=retries,
    796     response_conn=response_conn,
    797     preload_content=preload_content,
    798     decode_content=decode_content,
    799     **response_kw,
    800 )
    802 # Everything went great!

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connectionpool.py:534, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
    533 try:
--> 534     response = conn.getresponse()
    535 except (BaseSSLError, OSError) as e:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connection.py:516, in HTTPConnection.getresponse(self)
    515 # Get the response from http.client.HTTPConnection
--> 516 httplib_response = super().getresponse()
    518 try:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self)
   1374 try:
-> 1375     response.begin()
   1376 except ConnectionError:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/http/client.py:318, in HTTPResponse.begin(self)
    317 while True:
--> 318     version, status, reason = self._read_status()
    319     if status != CONTINUE:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/http/client.py:287, in HTTPResponse._read_status(self)
    284 if not line:
    285     # Presumably, the server closed the connection before
    286     # sending a valid response.
--> 287     raise RemoteDisconnected("Remote end closed connection without"
    288                              " response")
    289 try:

RemoteDisconnected: Remote end closed connection without response

During handling of the above exception, another exception occurred:

ProtocolError                             Traceback (most recent call last)
File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/requests/adapters.py:667, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
    666 try:
--> 667     resp = conn.urlopen(
    668         method=request.method,
    669         url=url,
    670         body=request.body,
    671         headers=request.headers,
    672         redirect=False,
    673         assert_same_host=False,
    674         preload_content=False,
    675         decode_content=False,
    676         retries=self.max_retries,
    677         timeout=timeout,
    678         chunked=chunked,
    679     )
    681 except (ProtocolError, OSError) as err:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connectionpool.py:841, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    839     new_e = ProtocolError("Connection aborted.", new_e)
--> 841 retries = retries.increment(
    842     method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]
    843 )
    844 retries.sleep()

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/util/retry.py:474, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)
    473 if read is False or method is None or not self._is_method_retryable(method):
--> 474     raise reraise(type(error), error, _stacktrace)
    475 elif read is not None:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/util/util.py:38, in reraise(tp, value, tb)
     37 if value.__traceback__ is not tb:
---> 38     raise value.with_traceback(tb)
     39 raise value

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connectionpool.py:787, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
    786 # Make the request on the HTTPConnection object
--> 787 response = self._make_request(
    788     conn,
    789     method,
    790     url,
    791     timeout=timeout_obj,
    792     body=body,
    793     headers=headers,
    794     chunked=chunked,
    795     retries=retries,
    796     response_conn=response_conn,
    797     preload_content=preload_content,
    798     decode_content=decode_content,
    799     **response_kw,
    800 )
    802 # Everything went great!

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connectionpool.py:534, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
    533 try:
--> 534     response = conn.getresponse()
    535 except (BaseSSLError, OSError) as e:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/urllib3/connection.py:516, in HTTPConnection.getresponse(self)
    515 # Get the response from http.client.HTTPConnection
--> 516 httplib_response = super().getresponse()
    518 try:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/http/client.py:1375, in HTTPConnection.getresponse(self)
   1374 try:
-> 1375     response.begin()
   1376 except ConnectionError:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/http/client.py:318, in HTTPResponse.begin(self)
    317 while True:
--> 318     version, status, reason = self._read_status()
    319     if status != CONTINUE:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/http/client.py:287, in HTTPResponse._read_status(self)
    284 if not line:
    285     # Presumably, the server closed the connection before
    286     # sending a valid response.
--> 287     raise RemoteDisconnected("Remote end closed connection without"
    288                              " response")
    289 try:

ProtocolError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

During handling of the above exception, another exception occurred:

ConnectionError                           Traceback (most recent call last)
Cell In[26], line 2
      1 attr_2yr_discharge_path = Path(test_eval_dir, "test_attr_2yr_discharge.csv")
----> 2 test_study_data.fetch_file("test_attr_2yr_discharge.csv", attr_2yr_discharge_path)
      4 attr_drainage_area_km2_path = Path(test_eval_dir, "test_attr_drainage_area_km2.csv")
      5 test_study_data.fetch_file("test_attr_drainage_area_km2.csv", attr_drainage_area_km2_path)

File ~/work/teehr/teehr/src/teehr/example_data/v0_3_test_study.py:53, in fetch_file(file_name, destination)
     51 """Fetch a file by name."""
     52 file = next(file for file in files if file["name"] == file_name)
---> 53 fetch_and_save_file(file["url"], destination)

File ~/work/teehr/teehr/src/teehr/example_data/v0_3_test_study.py:39, in fetch_and_save_file(url, destination)
     37 def fetch_and_save_file(url: str, destination: str) -> None:
     38     """Fetch a file from a URL and save it to a destination."""
---> 39     response = requests.get(url)
     40     response.raise_for_status()  # Ensure we notice bad responses
     42     with open(destination, 'wb') as f:

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/requests/api.py:73, in get(url, params, **kwargs)
     62 def get(url, params=None, **kwargs):
     63     r"""Sends a GET request.
     64 
     65     :param url: URL for the new :class:`Request` object.
   (...)
     70     :rtype: requests.Response
     71     """
---> 73     return request("get", url, params=params, **kwargs)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/requests/api.py:59, in request(method, url, **kwargs)
     55 # By using the 'with' statement we are sure the session is closed, thus we
     56 # avoid leaving sockets open which can trigger a ResourceWarning in some
     57 # cases, and look like a memory leak in others.
     58 with sessions.Session() as session:
---> 59     return session.request(method=method, url=url, **kwargs)

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
    584 send_kwargs = {
    585     "timeout": timeout,
    586     "allow_redirects": allow_redirects,
    587 }
    588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
    591 return resp

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs)
    700 start = preferred_clock()
    702 # Send the request
--> 703 r = adapter.send(request, **kwargs)
    705 # Total elapsed time of the request (approximately)
    706 elapsed = preferred_clock() - start

File /opt/hostedtoolcache/Python/3.10.18/x64/lib/python3.10/site-packages/requests/adapters.py:682, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
    667     resp = conn.urlopen(
    668         method=request.method,
    669         url=url,
   (...)
    678         chunked=chunked,
    679     )
    681 except (ProtocolError, OSError) as err:
--> 682     raise ConnectionError(err, request=request)
    684 except MaxRetryError as e:
    685     if isinstance(e.reason, ConnectTimeoutError):
    686         # TODO: Remove this in 3.0.0: see #2811

ConnectionError: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))

Like before, lets take a look at the raw data we downloaded from the repository.

# Read and display the 3 attribute table contents
attr_2yr_discharge_df = pd.read_csv(Path(test_eval_dir, "test_attr_2yr_discharge.csv"))
display(attr_2yr_discharge_df)

attr_drainage_area_km2_df = pd.read_csv(Path(test_eval_dir, "test_attr_drainage_area_km2.csv"))
display(attr_drainage_area_km2_df)

attr_ecoregion_df = pd.read_csv(Path(test_eval_dir, "test_attr_ecoregion.csv"))
display(attr_ecoregion_df)

We can see we have data for three different attributes here, year_2_discharge,drainage_area, ecoregion. Before we can load this data we need to add the attribute names to the attributes table and then we can add the location attribute values to the location_attributes table. Like we did for adding configurations, we can use the .add() method on the table sub-class to add new values.

# Add some attributes
ev.attributes.add(
    [
        teehr.Attribute(
            name="drainage_area",
            type="continuous",
            description="Drainage area in square kilometers"
        ),
        teehr.Attribute(
            name="ecoregion",
            type="categorical",
            description="Ecoregion"
        ),
        teehr.Attribute(
            name="year_2_discharge",
            type="continuous",
            description="2-yr discharge in cubic meters per second"
        ),
    ]
)

Now that the attributes have been added to the attributes table, we can load the location_attributes data.

ev.location_attributes.load_csv(
    in_path=test_eval_dir,
    field_mapping={"attribute_value": "value"},
    pattern="test_attr_*.csv",
    location_id_prefix="usgs",
)

And query to make sure it was added.

ev.location_attributes.to_pandas()

Now we have data in all the tables required to conduct an evaluation. There is one more step to do before we can start “slicing and dicing” the data and looking at evaluation metrics - we need to generate the joined_timeseries data table. The joined_timeseries table is essentially a materialized view that joins the primary_timeseries and secondary_timeseries based on location_id, value_time, variable_name and unit_name, adds the location_attributes, and adds any user defined fields.

Here we are going to execute the create() method on the joined_timeseries table class to generate the joined_timeseries “materialized view” from the data we just loaded into the domain, location and timeseries tables. The user defined fields to be added are defined as part of the evaluation in the [evaluation_dir]/scripts/user_defined_fields.py Python script. Note, it is possible to skip adding any user defined fields by setting execute_scripts=False when calling the create() method. Creating user defined fields can be a powerful tool to allow per-location, per-timestep fields to be added to the materialized joined_timeseries table. This will be covered in more detail in future lessons. For now, lets just set execute_scripts=True and execute the user_defined_fields.py script.

ev.joined_timeseries.create(execute_scripts=True)

Now, lets query the joined_timeseries table that we just created to see what it contains.

ev.joined_timeseries.to_pandas().head()

In this case, the joined_timeseries table has 16 fields. These fields are the result of joining the primary_timeseries to the secondary_timeseries, joining the location_attributes to that, and adding user defined fields. The bullet points below show which fields fall into which category:

Timeseries#

  • location_id

  • primary_location_id

  • secondary_location_id

  • configuration_name

  • variable_name

  • unit_name

  • reference_time

  • value_time

  • primary_value

  • secondary_value

Location Attributes#

  • year_2_discharge’

  • ecoregion

  • drainage_area

User Defined Fields#

  • month

  • year

  • water_year

ev.spark.stop()

This concludes the Loading Local Data lesson. In the next lesson we will clone an entire evaluation from the TEEHR AWS S3 bucket and start to explore the querying and metrics capabilities.