Metrics#
- class Metrics(ev)[source]#
Component class for calculating metrics.
Deprecated since version 0.6.0: The
Metricsclass (accessed viaev.metrics) is deprecated and will be removed in a future version. Use theaggregatemethod on the table directly with themetricsargument instead.For example:
ev.table("joined_timeseries").aggregate( metrics=[...], group_by=[...], ).order_by([...])
Notes
This is essentially a wrapper around the JoinedTimeseriesTable class but is initialized as a ‘joined_timeseries’ table by default.
Methods
Add location attributes to the DataFrame.
Add calculated fields to the DataFrame.
Add geometry to the DataFrame by joining with the locations table.
Aggregate data with grouping and metrics.
Delete rows from this table based on filter conditions.
Return distinct values for a column.
Drop this table from the catalog.
Apply filters to the DataFrame.
Apply ordering to the DataFrame.
Return GeoPandas DataFrame.
Return Pandas DataFrame.
Return the PySpark DataFrame.
Validate the dataset table against the schema.
Write the DataFrame to an iceberg table.
Attributes
extraction_funcforeign_keysReturn True if this table is a core (built-in) TEEHR table.
primary_location_id_fieldschema_funcsecondary_location_id_fieldstrict_validationtable_nameuniqueness_fieldsvalidate_filter_field_types- add_attributes(attr_list: List[str] | None = None, location_id_col: str | None = None)#
Add location attributes to the DataFrame.
Joins pivoted location attributes to the DataFrame. The join column is auto-detected from common location ID field names (‘location_id’, ‘primary_location_id’) unless specified.
This is especially useful when called after a
aggregate()with GROUP BY and aggregation metrics, so that attributes do not need to be included in thegroup_byclause in order to pass through to the result.- Parameters:
attr_list (
List[str], optional) – Specific attributes to add. If None, all attributes are added.location_id_col (
str, optional) – The column name in the DataFrame to join on. If None, checks for ‘location_id’ then ‘primary_location_id’.
- Returns:
self– Returns self for method chaining.
Examples
Add all attributes:
>>> df = accessor.add_attributes().to_pandas()
Add specific attributes:
>>> df = accessor.add_attributes( ... attr_list=["drainage_area", "ecoregion"] ... ).to_pandas()
Specify join column explicitly:
>>> df = accessor.add_attributes( ... location_id_col="primary_location_id" ... ).to_pandas()
Add attributes after metric aggregation — avoids including them in
group_by:>>> from teehr.metrics import KGE >>> df = ( ... ev.joined_timeseries_view() ... .aggregate( ... group_by=["primary_location_id"], ... metrics=[KGE()] ... ) ... .add_attributes(attr_list=["drainage_area", "ecoregion"]) ... .to_pandas() ... )
- add_calculated_fields(cfs: CalculatedFieldBaseModel | List[CalculatedFieldBaseModel])#
Add calculated fields to the DataFrame.
- Parameters:
cfs (
Union[CalculatedFieldBaseModel,List[...]]) – The calculated fields to add.- Returns:
self– Returns self for method chaining.
Examples
>>> import teehr >>> from teehr import RowLevelCalculatedFields as rcf >>> >>> df = accessor.add_calculated_fields([ >>> rcf.Month() >>> ]).to_pandas()
- add_geometry()#
Add geometry to the DataFrame by joining with the locations table.
- aggregate(group_by: str | List[str], metrics: List[MetricsBasemodel])#
Aggregate data with grouping and metrics.
- Parameters:
group_by (
Union[str,List[str]]) – Fields to group by for metric calculation.metrics (
List[MetricsBasemodel]) – Metrics to calculate.
- Returns:
self– Returns self for method chaining.
Examples
>>> df = accessor.aggregate( >>> metrics=[KGE()], >>> group_by=["primary_location_id"] >>> ).to_pandas()
Chain with filter and order_by:
>>> from teehr import DeterministicMetrics as dm >>> >>> df = ( >>> accessor >>> .filter("primary_location_id LIKE 'usgs%'") >>> .aggregate( >>> group_by=["primary_location_id", "configuration_name"], >>> metrics=[dm.KlingGuptaEfficiency(), dm.RelativeBias()] >>> ) >>> .order_by(["primary_location_id", "configuration_name"]) >>> .to_pandas() >>> )
- delete(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None, dry_run: bool = False) int | DataFrame#
Delete rows from this table based on filter conditions.
Delegates to
Write.delete_from().- Parameters:
filters (
Union[str,dict,TableFilter,List[...]], optional) – Filter conditions specifying which rows to delete. Supports SQL strings, dictionaries, orTableFilterobjects. IfNone, all rows in the table will be deleted.dry_run (
bool, optional) – IfTrue, returns a Spark DataFrame of rows that would be deleted without performing the actual deletion. Default isFalse.
- Returns:
intorps.DataFrame– Ifdry_run=False, returns the number of rows deleted (int). Ifdry_run=True, returns a Spark DataFrame of rows that would be deleted.
Examples
Preview rows that would be deleted (dry run):
>>> sdf = ev.table("primary_timeseries").delete( >>> filters=["location_id = 'usgs-01234567'"], >>> dry_run=True, >>> ) >>> print(f"Rows to delete: {sdf.count()}")
Delete rows and get the count:
>>> count = ev.table("primary_timeseries").delete( >>> filters=["location_id = 'usgs-01234567'"], >>> ) >>> print(f"Deleted {count} rows.")
Delete all rows from this table:
>>> count = ev.primary_timeseries.delete()
- distinct_values(column: str, location_prefixes: bool = False) List[str]#
Return distinct values for a column.
- Parameters:
column (
str) – The column to get distinct values for.location_prefixes (
bool) – Whether to return location prefixes. If True, only the unique prefixes of the locations will be returned. Default: False
- Returns:
List[str]– The distinct values for the column.
Examples
Get distinct location IDs from the primary timeseries table:
>>> ev.table(table_name="primary_timeseries").distinct_values( >>> column='location_id', >>> location_prefixes=False >>> )
Get distinct location prefixes from the joined timeseries table:
>>> ev.table(table_name="joined_timeseries").distinct_values( >>> column='primary_location_id', >>> location_prefixes=True >>> )
- drop()#
Drop this table from the catalog.
Only non-core tables (user-created tables, materialized views, saved query results) can be dropped. Attempting to drop a core table (e.g., primary_timeseries, locations, units) will raise a ValueError.
- Raises:
ValueError – If the table is a core TEEHR table.
Examples
Write and then drop a user-created table:
>>> ev.joined_timeseries_view().write("my_results") >>> ev.table("my_results").drop()
- filter(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None)#
Apply filters to the DataFrame.
- Parameters:
filters (
Union[str,dict,TableFilter,List[...]]) – The filters to apply. Can be SQL strings, dictionaries, or TableFilter objects.- Returns:
self– Returns self for method chaining.
Examples
Filters as dictionary:
>>> df = accessor.filter( >>> filters=[ >>> { >>> "column": "value_time", >>> "operator": ">", >>> "value": "2022-01-01", >>> }, >>> ] >>> ).to_pandas()
Filters as string:
>>> df = accessor.filter( >>> filters=["value_time > '2022-01-01'"] >>> ).to_pandas()
- property is_core_table: bool#
Return True if this table is a core (built-in) TEEHR table.
Core tables (e.g., primary_timeseries, locations, units) are part of the standard TEEHR schema and cannot be dropped. User-created tables (e.g., materialized views or saved query results) are not core tables and can be dropped.
- Returns:
bool– True if the table is a core TEEHR table, False otherwise.
- order_by(fields: str | StrEnum | List[str | StrEnum])#
Apply ordering to the DataFrame.
- Parameters:
fields (
Union[str,StrEnum,List[...]]) – The fields to order by.- Returns:
self– Returns self for method chaining.
Examples
>>> df = accessor.order_by("value_time").to_pandas()
- to_geopandas()#
Return GeoPandas DataFrame.
- Returns:
gpd.GeoDataFrame– The data as a GeoPandas DataFrame.
- to_pandas()#
Return Pandas DataFrame.
- Returns:
pd.DataFrame– The data as a Pandas DataFrame.
- to_sdf() DataFrame#
Return the PySpark DataFrame.
The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called (e.g., show(), collect(), toPandas()).
- Returns:
ps.DataFrame– The Spark DataFrame.
- validate(drop_duplicates: bool = True)#
Validate the dataset table against the schema.
- Parameters:
drop_duplicates (
bool, optional) – Whether to drop duplicates based on the uniqueness fields. Default is True.
Examples
Validate a table:
>>> ev.table( >>> table_name="primary_timeseries" >>> ).validate(drop_duplicates=True)
- write(table_name: str, write_mode: str = 'create_or_replace')#
Write the DataFrame to an iceberg table.
- Parameters:
table_name (
str) – The name of the table to write to.write_mode (
str, optional) – The write mode. Options: “create”, “append”, “overwrite”, “create_or_replace”. Default is “create_or_replace”.
- Returns:
self– Returns self for method chaining.
Examples
>>> accessor.aggregate( ... metrics=[KGE()], ... group_by=["primary_location_id"] ... ).write("location_metrics")