View#

class View(ev, catalog_name: str | None = None, namespace_name: str | None = None)[source]#

Base class for computed views.

A View represents a computed DataFrame that is evaluated lazily. Views can be chained with operations like filter(), query(), add_calculated_fields(), and ultimately materialized to an iceberg table via write().

Unlike Tables (which read from persisted iceberg tables), Views compute their data on-the-fly when accessed.

Methods

add_attributes

Add location attributes to the DataFrame.

add_calculated_fields

Add calculated fields to the DataFrame.

add_geometry

Add geometry to the DataFrame by joining with the locations table.

aggregate

Aggregate data with grouping and metrics.

filter

Apply filters to the DataFrame.

order_by

Apply ordering to the DataFrame.

to_geopandas

Return GeoPandas DataFrame.

to_pandas

Return Pandas DataFrame.

to_sdf

Return the computed PySpark DataFrame.

write

Write the DataFrame to an iceberg table.

add_attributes(attr_list: List[str] | None = None, location_id_col: str | None = None)#

Add location attributes to the DataFrame.

Joins pivoted location attributes to the DataFrame. The join column is auto-detected from common location ID field names (‘location_id’, ‘primary_location_id’) unless specified.

This is especially useful when called after a aggregate() with GROUP BY and aggregation metrics, so that attributes do not need to be included in the group_by clause in order to pass through to the result.

Parameters:
  • attr_list (List[str], optional) – Specific attributes to add. If None, all attributes are added.

  • location_id_col (str, optional) – The column name in the DataFrame to join on. If None, checks for ‘location_id’ then ‘primary_location_id’.

Returns:

self – Returns self for method chaining.

Examples

Add all attributes:

>>> df = accessor.add_attributes().to_pandas()

Add specific attributes:

>>> df = accessor.add_attributes(
...     attr_list=["drainage_area", "ecoregion"]
... ).to_pandas()

Specify join column explicitly:

>>> df = accessor.add_attributes(
...     location_id_col="primary_location_id"
... ).to_pandas()

Add attributes after metric aggregation — avoids including them in group_by:

>>> from teehr.metrics import KGE
>>> df = (
...     ev.joined_timeseries_view()
...     .aggregate(
...         group_by=["primary_location_id"],
...         metrics=[KGE()]
...     )
...     .add_attributes(attr_list=["drainage_area", "ecoregion"])
...     .to_pandas()
... )
add_calculated_fields(cfs: CalculatedFieldBaseModel | List[CalculatedFieldBaseModel])#

Add calculated fields to the DataFrame.

Parameters:

cfs (Union[CalculatedFieldBaseModel, List[...]]) – The calculated fields to add.

Returns:

self – Returns self for method chaining.

Examples

>>> import teehr
>>> from teehr import RowLevelCalculatedFields as rcf
>>>
>>> df = accessor.add_calculated_fields([
>>>     rcf.Month()
>>> ]).to_pandas()
add_geometry()#

Add geometry to the DataFrame by joining with the locations table.

aggregate(group_by: str | List[str], metrics: List[MetricsBasemodel])#

Aggregate data with grouping and metrics.

Parameters:
  • group_by (Union[str, List[str]]) – Fields to group by for metric calculation.

  • metrics (List[MetricsBasemodel]) – Metrics to calculate.

Returns:

self – Returns self for method chaining.

Examples

>>> df = accessor.aggregate(
>>>     metrics=[KGE()],
>>>     group_by=["primary_location_id"]
>>> ).to_pandas()

Chain with filter and order_by:

>>> from teehr import DeterministicMetrics as dm
>>>
>>> df = (
>>>     accessor
>>>     .filter("primary_location_id LIKE 'usgs%'")
>>>     .aggregate(
>>>         group_by=["primary_location_id", "configuration_name"],
>>>         metrics=[dm.KlingGuptaEfficiency(), dm.RelativeBias()]
>>>     )
>>>     .order_by(["primary_location_id", "configuration_name"])
>>>     .to_pandas()
>>> )
filter(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None)#

Apply filters to the DataFrame.

Parameters:

filters (Union[str, dict, TableFilter, List[...]]) – The filters to apply. Can be SQL strings, dictionaries, or TableFilter objects.

Returns:

self – Returns self for method chaining.

Examples

Filters as dictionary:

>>> df = accessor.filter(
>>>     filters=[
>>>         {
>>>             "column": "value_time",
>>>             "operator": ">",
>>>             "value": "2022-01-01",
>>>         },
>>>     ]
>>> ).to_pandas()

Filters as string:

>>> df = accessor.filter(
>>>     filters=["value_time > '2022-01-01'"]
>>> ).to_pandas()
order_by(fields: str | StrEnum | List[str | StrEnum])#

Apply ordering to the DataFrame.

Parameters:

fields (Union[str, StrEnum, List[...]]) – The fields to order by.

Returns:

self – Returns self for method chaining.

Examples

>>> df = accessor.order_by("value_time").to_pandas()
to_geopandas()#

Return GeoPandas DataFrame.

Returns:

gpd.GeoDataFrame – The data as a GeoPandas DataFrame.

to_pandas()#

Return Pandas DataFrame.

Returns:

pd.DataFrame – The data as a Pandas DataFrame.

to_sdf() DataFrame[source]#

Return the computed PySpark DataFrame.

Triggers computation if not already computed. The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called (e.g., show(), collect(), toPandas()).

Returns:

ps.DataFrame – The computed Spark DataFrame.

write(table_name: str, write_mode: str = 'create_or_replace')#

Write the DataFrame to an iceberg table.

Parameters:
  • table_name (str) – The name of the table to write to.

  • write_mode (str, optional) – The write mode. Options: “create”, “append”, “overwrite”, “create_or_replace”. Default is “create_or_replace”.

Returns:

self – Returns self for method chaining.

Examples

>>> accessor.aggregate(
...     metrics=[KGE()],
...     group_by=["primary_location_id"]
... ).write("location_metrics")