LocationAttributesView#
- class LocationAttributesView(ev, attr_list: List[str] | None = None, catalog_name: str | None = None, namespace_name: str | None = None)[source]#
A computed view that pivots location attributes.
Transforms the location_attributes table from long format (location_id, attribute_name, value) to wide format where each attribute becomes a column.
Examples
Pivot all attributes:
>>> ev.location_attributes_view().to_pandas()
Pivot specific attributes:
>>> ev.location_attributes_view( ... attr_list=["drainage_area", "ecoregion"] ... ).to_pandas()
With filters (chained):
>>> ev.location_attributes_view().filter( ... "location_id LIKE 'usgs%'" ... ).to_pandas()
Materialize for later use:
>>> ev.location_attributes_view().write("pivoted_attrs")
Methods
Add location attributes to the DataFrame.
Add calculated fields to the DataFrame.
Add geometry to the DataFrame by joining with the locations table.
Aggregate data with grouping and metrics.
Apply filters to the DataFrame.
Apply ordering to the DataFrame.
Return GeoPandas DataFrame.
Return Pandas DataFrame.
Return the computed PySpark DataFrame.
Write the DataFrame to an iceberg table.
- add_attributes(attr_list: List[str] | None = None, location_id_col: str | None = None)#
Add location attributes to the DataFrame.
Joins pivoted location attributes to the DataFrame. The join column is auto-detected from common location ID field names (‘location_id’, ‘primary_location_id’) unless specified.
This is especially useful when called after a
aggregate()with GROUP BY and aggregation metrics, so that attributes do not need to be included in thegroup_byclause in order to pass through to the result.- Parameters:
attr_list (
List[str], optional) – Specific attributes to add. If None, all attributes are added.location_id_col (
str, optional) – The column name in the DataFrame to join on. If None, checks for ‘location_id’ then ‘primary_location_id’.
- Returns:
self– Returns self for method chaining.
Examples
Add all attributes:
>>> df = accessor.add_attributes().to_pandas()
Add specific attributes:
>>> df = accessor.add_attributes( ... attr_list=["drainage_area", "ecoregion"] ... ).to_pandas()
Specify join column explicitly:
>>> df = accessor.add_attributes( ... location_id_col="primary_location_id" ... ).to_pandas()
Add attributes after metric aggregation — avoids including them in
group_by:>>> from teehr.metrics import KGE >>> df = ( ... ev.joined_timeseries_view() ... .aggregate( ... group_by=["primary_location_id"], ... metrics=[KGE()] ... ) ... .add_attributes(attr_list=["drainage_area", "ecoregion"]) ... .to_pandas() ... )
- add_calculated_fields(cfs: CalculatedFieldBaseModel | List[CalculatedFieldBaseModel])#
Add calculated fields to the DataFrame.
- Parameters:
cfs (
Union[CalculatedFieldBaseModel,List[...]]) – The calculated fields to add.- Returns:
self– Returns self for method chaining.
Examples
>>> import teehr >>> from teehr import RowLevelCalculatedFields as rcf >>> >>> df = accessor.add_calculated_fields([ >>> rcf.Month() >>> ]).to_pandas()
- add_geometry()#
Add geometry to the DataFrame by joining with the locations table.
- aggregate(group_by: str | List[str], metrics: List[MetricsBasemodel])#
Aggregate data with grouping and metrics.
- Parameters:
group_by (
Union[str,List[str]]) – Fields to group by for metric calculation.metrics (
List[MetricsBasemodel]) – Metrics to calculate.
- Returns:
self– Returns self for method chaining.
Examples
>>> df = accessor.aggregate( >>> metrics=[KGE()], >>> group_by=["primary_location_id"] >>> ).to_pandas()
Chain with filter and order_by:
>>> from teehr import DeterministicMetrics as dm >>> >>> df = ( >>> accessor >>> .filter("primary_location_id LIKE 'usgs%'") >>> .aggregate( >>> group_by=["primary_location_id", "configuration_name"], >>> metrics=[dm.KlingGuptaEfficiency(), dm.RelativeBias()] >>> ) >>> .order_by(["primary_location_id", "configuration_name"]) >>> .to_pandas() >>> )
- filter(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None)#
Apply filters to the DataFrame.
- Parameters:
filters (
Union[str,dict,TableFilter,List[...]]) – The filters to apply. Can be SQL strings, dictionaries, or TableFilter objects.- Returns:
self– Returns self for method chaining.
Examples
Filters as dictionary:
>>> df = accessor.filter( >>> filters=[ >>> { >>> "column": "value_time", >>> "operator": ">", >>> "value": "2022-01-01", >>> }, >>> ] >>> ).to_pandas()
Filters as string:
>>> df = accessor.filter( >>> filters=["value_time > '2022-01-01'"] >>> ).to_pandas()
- order_by(fields: str | StrEnum | List[str | StrEnum])#
Apply ordering to the DataFrame.
- Parameters:
fields (
Union[str,StrEnum,List[...]]) – The fields to order by.- Returns:
self– Returns self for method chaining.
Examples
>>> df = accessor.order_by("value_time").to_pandas()
- to_geopandas()#
Return GeoPandas DataFrame.
- Returns:
gpd.GeoDataFrame– The data as a GeoPandas DataFrame.
- to_pandas()#
Return Pandas DataFrame.
- Returns:
pd.DataFrame– The data as a Pandas DataFrame.
- to_sdf() DataFrame#
Return the computed PySpark DataFrame.
Triggers computation if not already computed. The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called (e.g., show(), collect(), toPandas()).
- Returns:
ps.DataFrame– The computed Spark DataFrame.
- write(table_name: str, write_mode: str = 'create_or_replace')#
Write the DataFrame to an iceberg table.
- Parameters:
table_name (
str) – The name of the table to write to.write_mode (
str, optional) – The write mode. Options: “create”, “append”, “overwrite”, “create_or_replace”. Default is “create_or_replace”.
- Returns:
self– Returns self for method chaining.
Examples
>>> accessor.aggregate( ... metrics=[KGE()], ... group_by=["primary_location_id"] ... ).write("location_metrics")