TimeseriesTable#

class TimeseriesTable(ev, table_name: str | None = None, namespace_name: str | None = None, catalog_name: str | None = None)[source]#

Access methods to timeseries table.

Base class for primary and secondary timeseries tables.

Methods

add_attributes

Add location attributes to the DataFrame.

add_calculated_fields

Add calculated fields to the DataFrame.

add_geometry

Add geometry to the DataFrame by joining with the locations table.

aggregate

Aggregate data with grouping and metrics.

delete

Delete rows from this table based on filter conditions.

distinct_values

Return distinct values for a column.

drop

Drop this table from the catalog.

filter

Apply filters to the DataFrame.

load_csv

Import timeseries csv data.

load_dataframe

Load data from an in-memory dataframe.

load_fews_xml

Import timeseries from FEWS PI-XML data format.

load_netcdf

Import timeseries netcdf data.

load_parquet

Import timeseries parquet data.

order_by

Apply ordering to the DataFrame.

to_geopandas

Return GeoPandas DataFrame.

to_pandas

Return Pandas DataFrame.

to_sdf

Return the PySpark DataFrame.

validate

Validate the dataset table against the schema.

write

Write the DataFrame to an iceberg table.

Attributes

foreign_keys

is_core_table

Return True if this table is a core (built-in) TEEHR table.

primary_location_id_field

schema_func

secondary_location_id_field

strict_validation

table_name

uniqueness_fields

validate_filter_field_types

extraction_func(in_filepath, field_mapping, ...)

Convert timeseries data to a pandas DataFrame.

add_attributes(attr_list: List[str] | None = None, location_id_col: str | None = None)#

Add location attributes to the DataFrame.

Joins pivoted location attributes to the DataFrame. The join column is auto-detected from common location ID field names (‘location_id’, ‘primary_location_id’) unless specified.

This is especially useful when called after a aggregate() with GROUP BY and aggregation metrics, so that attributes do not need to be included in the group_by clause in order to pass through to the result.

Parameters:
  • attr_list (List[str], optional) – Specific attributes to add. If None, all attributes are added.

  • location_id_col (str, optional) – The column name in the DataFrame to join on. If None, checks for ‘location_id’ then ‘primary_location_id’.

Returns:

self – Returns self for method chaining.

Examples

Add all attributes:

>>> df = accessor.add_attributes().to_pandas()

Add specific attributes:

>>> df = accessor.add_attributes(
...     attr_list=["drainage_area", "ecoregion"]
... ).to_pandas()

Specify join column explicitly:

>>> df = accessor.add_attributes(
...     location_id_col="primary_location_id"
... ).to_pandas()

Add attributes after metric aggregation — avoids including them in group_by:

>>> from teehr.metrics import KGE
>>> df = (
...     ev.joined_timeseries_view()
...     .aggregate(
...         group_by=["primary_location_id"],
...         metrics=[KGE()]
...     )
...     .add_attributes(attr_list=["drainage_area", "ecoregion"])
...     .to_pandas()
... )
add_calculated_fields(cfs: CalculatedFieldBaseModel | List[CalculatedFieldBaseModel])#

Add calculated fields to the DataFrame.

Parameters:

cfs (Union[CalculatedFieldBaseModel, List[...]]) – The calculated fields to add.

Returns:

self – Returns self for method chaining.

Examples

>>> import teehr
>>> from teehr import RowLevelCalculatedFields as rcf
>>>
>>> df = accessor.add_calculated_fields([
>>>     rcf.Month()
>>> ]).to_pandas()
add_geometry()#

Add geometry to the DataFrame by joining with the locations table.

aggregate(group_by: str | List[str], metrics: List[MetricsBasemodel])#

Aggregate data with grouping and metrics.

Parameters:
  • group_by (Union[str, List[str]]) – Fields to group by for metric calculation.

  • metrics (List[MetricsBasemodel]) – Metrics to calculate.

Returns:

self – Returns self for method chaining.

Examples

>>> df = accessor.aggregate(
>>>     metrics=[KGE()],
>>>     group_by=["primary_location_id"]
>>> ).to_pandas()

Chain with filter and order_by:

>>> from teehr import DeterministicMetrics as dm
>>>
>>> df = (
>>>     accessor
>>>     .filter("primary_location_id LIKE 'usgs%'")
>>>     .aggregate(
>>>         group_by=["primary_location_id", "configuration_name"],
>>>         metrics=[dm.KlingGuptaEfficiency(), dm.RelativeBias()]
>>>     )
>>>     .order_by(["primary_location_id", "configuration_name"])
>>>     .to_pandas()
>>> )
delete(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None, dry_run: bool = False) int | DataFrame#

Delete rows from this table based on filter conditions.

Delegates to Write.delete_from().

Parameters:
  • filters (Union[str, dict, TableFilter, List[...]], optional) – Filter conditions specifying which rows to delete. Supports SQL strings, dictionaries, or TableFilter objects. If None, all rows in the table will be deleted.

  • dry_run (bool, optional) – If True, returns a Spark DataFrame of rows that would be deleted without performing the actual deletion. Default is False.

Returns:

int or ps.DataFrame – If dry_run=False, returns the number of rows deleted (int). If dry_run=True, returns a Spark DataFrame of rows that would be deleted.

Examples

Preview rows that would be deleted (dry run):

>>> sdf = ev.table("primary_timeseries").delete(
>>>     filters=["location_id = 'usgs-01234567'"],
>>>     dry_run=True,
>>> )
>>> print(f"Rows to delete: {sdf.count()}")

Delete rows and get the count:

>>> count = ev.table("primary_timeseries").delete(
>>>     filters=["location_id = 'usgs-01234567'"],
>>> )
>>> print(f"Deleted {count} rows.")

Delete all rows from this table:

>>> count = ev.primary_timeseries.delete()
distinct_values(column: str, location_prefixes: bool = False) List[str]#

Return distinct values for a column.

Parameters:
  • column (str) – The column to get distinct values for.

  • location_prefixes (bool) – Whether to return location prefixes. If True, only the unique prefixes of the locations will be returned. Default: False

Returns:

List[str] – The distinct values for the column.

Examples

Get distinct location IDs from the primary timeseries table:

>>> ev.table(table_name="primary_timeseries").distinct_values(
>>>     column='location_id',
>>>     location_prefixes=False
>>> )

Get distinct location prefixes from the joined timeseries table:

>>> ev.table(table_name="joined_timeseries").distinct_values(
>>>     column='primary_location_id',
>>>     location_prefixes=True
>>> )
drop()#

Drop this table from the catalog.

Only non-core tables (user-created tables, materialized views, saved query results) can be dropped. Attempting to drop a core table (e.g., primary_timeseries, locations, units) will raise a ValueError.

Raises:

ValueError – If the table is a core TEEHR table.

Examples

Write and then drop a user-created table:

>>> ev.joined_timeseries_view().write("my_results")
>>> ev.table("my_results").drop()
static extraction_func(in_filepath: str | Path, field_mapping: dict, **kwargs) DataFrame#

Convert timeseries data to a pandas DataFrame.

Parameters:
  • in_filepath (Union[str, Path]) – The input file path.

  • field_mapping (dict) – A dictionary mapping input fields to output fields. format: {input_field: output_field}

  • **kwargs

    Additional keyword arguments are passed to

    pd.read_csv(), pd.read_parquet(), or xr.open_dataset().

Returns:

pd.DataFrame

Notes

The input file can be in CSV, Parquet, NetCDF, or XML format. The field_mapping is used to rename the columns in the resulting DataFrame to match the TEEHR data model. The function will read the file, convert it to a DataFrame, rename the columns based on the field_mapping, and return the resulting DataFrame.

filter(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None)#

Apply filters to the DataFrame.

Parameters:

filters (Union[str, dict, TableFilter, List[...]]) – The filters to apply. Can be SQL strings, dictionaries, or TableFilter objects.

Returns:

self – Returns self for method chaining.

Examples

Filters as dictionary:

>>> df = accessor.filter(
>>>     filters=[
>>>         {
>>>             "column": "value_time",
>>>             "operator": ">",
>>>             "value": "2022-01-01",
>>>         },
>>>     ]
>>> ).to_pandas()

Filters as string:

>>> df = accessor.filter(
>>>     filters=["value_time > '2022-01-01'"]
>>> ).to_pandas()
property is_core_table: bool#

Return True if this table is a core (built-in) TEEHR table.

Core tables (e.g., primary_timeseries, locations, units) are part of the standard TEEHR schema and cannot be dropped. User-created tables (e.g., materialized views or saved query results) are not core tables and can be dropped.

Returns:

bool – True if the table is a core TEEHR table, False otherwise.

load_csv(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.csv', field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#

Import timeseries csv data.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in csv file format.

  • namespace_name (str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.

  • catalog_name (str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.

  • extraction_function (callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.

  • pattern (str, optional (default: "**/*.csv")) – The pattern to match files. Controls which files are loaded from the directory. If in_path is a file, this parameter is ignored.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}

  • location_id_prefix (str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.

  • parallel (bool, optional (default: False)) – Whether to process files in parallel. Default is False.

  • max_workers (Union[int, None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.

  • drop_duplicates (bool, optional (default: True)) – Whether to drop duplicates from the DataFrame during validation.

  • **kwargs – Additional keyword arguments are passed to pd.read_csv().

  • Includes validation and importing data to database.

Notes

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

load_dataframe(df: DataFrame | DataFrame, namespace_name: str | None = None, catalog_name: str | None = None, field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', drop_duplicates: bool = True)[source]#

Load data from an in-memory dataframe.

Parameters:
  • df (Union[pd.DataFrame, ps.DataFrame]) – Pandas or PySparkDataFrame to load into the table.

  • namespace_name (str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.

  • catalog_name (str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}.

  • location_id_prefix (str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.

  • drop_duplicates (bool, optional (default: True)) – Whether to drop duplicates from the dataframe.

load_fews_xml(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.xml', field_mapping: dict = {'ensembleId': 'configuration_name', 'ensembleMemberIndex': 'member', 'forecastDate': 'reference_time', 'locationId': 'location_id', 'parameterId': 'variable_name', 'units': 'unit_name'}, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#

Import timeseries from FEWS PI-XML data format.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in xml file format.

  • namespace_name (str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.

  • catalog_name (str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.

  • extraction_function (callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.

  • pattern (str, optional (default: "**/*.xml")) – The pattern to match files. Controls which files are loaded from the directory. If in_path is a file, this parameter is ignored.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field} Default mapping:

    {
        "locationId": "location_id",
        "forecastDate": "reference_time",
        "parameterId": "variable_name",
        "units": "unit_name",
        "ensembleId": "configuration_name",
        "ensembleMemberIndex": "member",
        "forecastDate": "reference_time"
    }
    
  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}.

  • location_id_prefix (str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.

  • parallel (bool, optional (default: False)) – Whether to process files in parallel. Default is False.

  • max_workers (Union[int, None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.

  • drop_duplicates (bool, optional (default: True)) – Whether to drop duplicates from the dataframe.

  • Includes validation and importing data to database.

Notes

This function follows the Delft-FEWS Published Interface (PI) XML format.

reference: https://publicwiki.deltares.nl/display/FEWSDOC/Dynamic+data

The value and value_time fields are parsed automatically.

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

  • member

load_netcdf(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.nc', field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#

Import timeseries netcdf data.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in netcdf file format.

  • namespace_name (str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.

  • catalog_name (str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.

  • extraction_function (callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.

  • pattern (str, optional (default: "**/*.nc")) – The pattern to match files. Controls which files are loaded from the directory. If in_path is a file, this parameter is ignored.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}

  • location_id_prefix (str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.

  • parallel (bool, optional (default: False)) – Whether to process files in parallel. Default is False.

  • max_workers (Union[int, None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.

  • drop_duplicates (bool, optional (default: True)) – Whether to drop duplicates from the DataFrame during validation.

  • **kwargs – Additional keyword arguments are passed to xr.open_dataset().

  • Includes validation and importing data to database.

Notes

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

load_parquet(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.parquet', field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#

Import timeseries parquet data.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in parquet file format.

  • namespace_name (str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.

  • catalog_name (str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.

  • extraction_function (callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.

  • pattern (str, optional) – The glob pattern to use when searching for files in a directory. Default is ‘**/*.parquet’ to search for all parquet files recursively.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}

  • location_id_prefix (str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.

  • write_mode (str, optional (default: "append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.

  • parallel (bool, optional (default: False)) – Whether to process files in parallel. Default is False.

  • max_workers (Union[int, None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.

  • drop_duplicates (bool, optional (default: True)) – Whether to drop duplicates from the DataFrame during validation.

  • **kwargs – Additional keyword arguments are passed to pd.read_parquet().

  • Includes validation and importing data to database.

Notes

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

order_by(fields: str | StrEnum | List[str | StrEnum])#

Apply ordering to the DataFrame.

Parameters:

fields (Union[str, StrEnum, List[...]]) – The fields to order by.

Returns:

self – Returns self for method chaining.

Examples

>>> df = accessor.order_by("value_time").to_pandas()
to_geopandas()#

Return GeoPandas DataFrame.

Returns:

gpd.GeoDataFrame – The data as a GeoPandas DataFrame.

to_pandas()#

Return Pandas DataFrame.

Returns:

pd.DataFrame – The data as a Pandas DataFrame.

to_sdf() DataFrame#

Return the PySpark DataFrame.

The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called (e.g., show(), collect(), toPandas()).

Returns:

ps.DataFrame – The Spark DataFrame.

validate(drop_duplicates: bool = True)#

Validate the dataset table against the schema.

Parameters:

drop_duplicates (bool, optional) – Whether to drop duplicates based on the uniqueness fields. Default is True.

Examples

Validate a table:

>>> ev.table(
>>>     table_name="primary_timeseries"
>>> ).validate(drop_duplicates=True)
write(table_name: str, write_mode: str = 'create_or_replace')#

Write the DataFrame to an iceberg table.

Parameters:
  • table_name (str) – The name of the table to write to.

  • write_mode (str, optional) – The write mode. Options: “create”, “append”, “overwrite”, “create_or_replace”. Default is “create_or_replace”.

Returns:

self – Returns self for method chaining.

Examples

>>> accessor.aggregate(
...     metrics=[KGE()],
...     group_by=["primary_location_id"]
... ).write("location_metrics")