DomainTable#

class DomainTable(ev, table_name: str | None = None, namespace_name: str | None = None, catalog_name: str | None = None)[source]#

Domain table class.

Domain tables store reference data (units, variables, configurations, attributes) that other tables reference via foreign keys.

Methods

add_attributes

Add location attributes to the DataFrame.

add_calculated_fields

Add calculated fields to the DataFrame.

add_geometry

Add geometry to the DataFrame.

aggregate

Aggregate data with grouping and metrics.

delete

Delete rows from this table based on filter conditions.

distinct_values

Return distinct values for a column.

drop

Drop this table from the catalog.

filter

Apply filters to the DataFrame.

order_by

Apply ordering to the DataFrame.

to_geopandas

Return GeoPandas DataFrame.

to_pandas

Return Pandas DataFrame.

to_sdf

Return the PySpark DataFrame.

validate

Validate the dataset table against the schema.

write

Write the DataFrame to an iceberg table.

Attributes

extraction_func

foreign_keys

is_core_table

Return True if this table is a core (built-in) TEEHR table.

primary_location_id_field

schema_func

secondary_location_id_field

strict_validation

table_name

uniqueness_fields

validate_filter_field_types

add_attributes(attr_list: List[str] | None = None, location_id_col: str | None = None)#

Add location attributes to the DataFrame.

Joins pivoted location attributes to the DataFrame. The join column is auto-detected from common location ID field names (‘location_id’, ‘primary_location_id’) unless specified.

This is especially useful when called after a aggregate() with GROUP BY and aggregation metrics, so that attributes do not need to be included in the group_by clause in order to pass through to the result.

Parameters:
  • attr_list (List[str], optional) – Specific attributes to add. If None, all attributes are added.

  • location_id_col (str, optional) – The column name in the DataFrame to join on. If None, checks for ‘location_id’ then ‘primary_location_id’.

Returns:

self – Returns self for method chaining.

Examples

Add all attributes:

>>> df = accessor.add_attributes().to_pandas()

Add specific attributes:

>>> df = accessor.add_attributes(
...     attr_list=["drainage_area", "ecoregion"]
... ).to_pandas()

Specify join column explicitly:

>>> df = accessor.add_attributes(
...     location_id_col="primary_location_id"
... ).to_pandas()

Add attributes after metric aggregation — avoids including them in group_by:

>>> from teehr.metrics import KGE
>>> df = (
...     ev.joined_timeseries_view()
...     .aggregate(
...         group_by=["primary_location_id"],
...         metrics=[KGE()]
...     )
...     .add_attributes(attr_list=["drainage_area", "ecoregion"])
...     .to_pandas()
... )
add_calculated_fields(cfs: CalculatedFieldBaseModel | List[CalculatedFieldBaseModel])#

Add calculated fields to the DataFrame.

Parameters:

cfs (Union[CalculatedFieldBaseModel, List[...]]) – The calculated fields to add.

Returns:

self – Returns self for method chaining.

Examples

>>> import teehr
>>> from teehr import RowLevelCalculatedFields as rcf
>>>
>>> df = accessor.add_calculated_fields([
>>>     rcf.Month()
>>> ]).to_pandas()
add_geometry()[source]#

Add geometry to the DataFrame.

aggregate(group_by: str | List[str], metrics: List[MetricsBasemodel])#

Aggregate data with grouping and metrics.

Parameters:
  • group_by (Union[str, List[str]]) – Fields to group by for metric calculation.

  • metrics (List[MetricsBasemodel]) – Metrics to calculate.

Returns:

self – Returns self for method chaining.

Examples

>>> df = accessor.aggregate(
>>>     metrics=[KGE()],
>>>     group_by=["primary_location_id"]
>>> ).to_pandas()

Chain with filter and order_by:

>>> from teehr import DeterministicMetrics as dm
>>>
>>> df = (
>>>     accessor
>>>     .filter("primary_location_id LIKE 'usgs%'")
>>>     .aggregate(
>>>         group_by=["primary_location_id", "configuration_name"],
>>>         metrics=[dm.KlingGuptaEfficiency(), dm.RelativeBias()]
>>>     )
>>>     .order_by(["primary_location_id", "configuration_name"])
>>>     .to_pandas()
>>> )
delete(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None, dry_run: bool = False) int | DataFrame#

Delete rows from this table based on filter conditions.

Delegates to Write.delete_from().

Parameters:
  • filters (Union[str, dict, TableFilter, List[...]], optional) – Filter conditions specifying which rows to delete. Supports SQL strings, dictionaries, or TableFilter objects. If None, all rows in the table will be deleted.

  • dry_run (bool, optional) – If True, returns a Spark DataFrame of rows that would be deleted without performing the actual deletion. Default is False.

Returns:

int or ps.DataFrame – If dry_run=False, returns the number of rows deleted (int). If dry_run=True, returns a Spark DataFrame of rows that would be deleted.

Examples

Preview rows that would be deleted (dry run):

>>> sdf = ev.table("primary_timeseries").delete(
>>>     filters=["location_id = 'usgs-01234567'"],
>>>     dry_run=True,
>>> )
>>> print(f"Rows to delete: {sdf.count()}")

Delete rows and get the count:

>>> count = ev.table("primary_timeseries").delete(
>>>     filters=["location_id = 'usgs-01234567'"],
>>> )
>>> print(f"Deleted {count} rows.")

Delete all rows from this table:

>>> count = ev.primary_timeseries.delete()
distinct_values(column: str, location_prefixes: bool = False) List[str]#

Return distinct values for a column.

Parameters:
  • column (str) – The column to get distinct values for.

  • location_prefixes (bool) – Whether to return location prefixes. If True, only the unique prefixes of the locations will be returned. Default: False

Returns:

List[str] – The distinct values for the column.

Examples

Get distinct location IDs from the primary timeseries table:

>>> ev.table(table_name="primary_timeseries").distinct_values(
>>>     column='location_id',
>>>     location_prefixes=False
>>> )

Get distinct location prefixes from the joined timeseries table:

>>> ev.table(table_name="joined_timeseries").distinct_values(
>>>     column='primary_location_id',
>>>     location_prefixes=True
>>> )
drop()#

Drop this table from the catalog.

Only non-core tables (user-created tables, materialized views, saved query results) can be dropped. Attempting to drop a core table (e.g., primary_timeseries, locations, units) will raise a ValueError.

Raises:

ValueError – If the table is a core TEEHR table.

Examples

Write and then drop a user-created table:

>>> ev.joined_timeseries_view().write("my_results")
>>> ev.table("my_results").drop()
filter(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None)#

Apply filters to the DataFrame.

Parameters:

filters (Union[str, dict, TableFilter, List[...]]) – The filters to apply. Can be SQL strings, dictionaries, or TableFilter objects.

Returns:

self – Returns self for method chaining.

Examples

Filters as dictionary:

>>> df = accessor.filter(
>>>     filters=[
>>>         {
>>>             "column": "value_time",
>>>             "operator": ">",
>>>             "value": "2022-01-01",
>>>         },
>>>     ]
>>> ).to_pandas()

Filters as string:

>>> df = accessor.filter(
>>>     filters=["value_time > '2022-01-01'"]
>>> ).to_pandas()
property is_core_table: bool#

Return True if this table is a core (built-in) TEEHR table.

Core tables (e.g., primary_timeseries, locations, units) are part of the standard TEEHR schema and cannot be dropped. User-created tables (e.g., materialized views or saved query results) are not core tables and can be dropped.

Returns:

bool – True if the table is a core TEEHR table, False otherwise.

order_by(fields: str | StrEnum | List[str | StrEnum])#

Apply ordering to the DataFrame.

Parameters:

fields (Union[str, StrEnum, List[...]]) – The fields to order by.

Returns:

self – Returns self for method chaining.

Examples

>>> df = accessor.order_by("value_time").to_pandas()
to_geopandas()[source]#

Return GeoPandas DataFrame.

to_pandas()#

Return Pandas DataFrame.

Returns:

pd.DataFrame – The data as a Pandas DataFrame.

to_sdf() DataFrame#

Return the PySpark DataFrame.

The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called (e.g., show(), collect(), toPandas()).

Returns:

ps.DataFrame – The Spark DataFrame.

validate(drop_duplicates: bool = True)#

Validate the dataset table against the schema.

Parameters:

drop_duplicates (bool, optional) – Whether to drop duplicates based on the uniqueness fields. Default is True.

Examples

Validate a table:

>>> ev.table(
>>>     table_name="primary_timeseries"
>>> ).validate(drop_duplicates=True)
write(table_name: str, write_mode: str = 'create_or_replace')#

Write the DataFrame to an iceberg table.

Parameters:
  • table_name (str) – The name of the table to write to.

  • write_mode (str, optional) – The write mode. Options: “create”, “append”, “overwrite”, “create_or_replace”. Default is “create_or_replace”.

Returns:

self – Returns self for method chaining.

Examples

>>> accessor.aggregate(
...     metrics=[KGE()],
...     group_by=["primary_location_id"]
... ).write("location_metrics")