teehr.evaluation.metrics.Metrics#

class teehr.evaluation.metrics.Metrics(ev)[source]#

Bases: object

Component class for calculating metrics.

Methods

query

Perform a query on the dataset joined timeseries table.

to_geopandas

Convert the DataFrame to a GeoPandas DataFrame.

to_pandas

Convert the DataFrame to a Pandas DataFrame.

to_sdf

Return the Spark DataFrame.

query(filters: str | dict | JoinedTimeseriesFilter | List[str | dict | JoinedTimeseriesFilter] | None = None, order_by: str | JoinedTimeseriesFields | List[str | JoinedTimeseriesFields] | None = None, group_by: str | JoinedTimeseriesFields | List[str | JoinedTimeseriesFields] | None = None, include_metrics: List[MetricsBasemodel] | str | None = None)[source]#

Perform a query on the dataset joined timeseries table.

Parameters:
  • filters (Union[str, dict, JoinedTimeseriesFilter, List[Union[str, dict, JoinedTimeseriesFilter]]], optional) – The filters to apply to the query, by default None

  • order_by (Union[str, JoinedTimeseriesFields, List[Union[str, JoinedTimeseriesFields]]], optional) – The fields to order the query by, by default None

  • group_by (Union[str, JoinedTimeseriesFields, List[Union[str, JoinedTimeseriesFields]]], optional) – The fields to group the query by, by default None

  • include_metrics (Union[List[MetricsBasemodel], str], optional) – The metrics to include in the query, by default None

Examples

>>> import teehr
>>> ev = teehr.Evaluation()

Define some metrics, optionally including an available bootstrapping method. (Metric Models).

>>> import teehr.Metrics as m
>>> import teehr.Bootstrappers as b

Define a Circular Block bootstrapper. (Bootstrap Models).

>>> boot = b.CircularBlock(
>>>     seed=40,
>>>     block_size=100,
>>>     quantiles=None,
>>>     reps=500
>>> )

Include the bootstrap model in the metric definition(s), along with other optional arguments.

>>> kge = m.KlingGuptaEfficiency(bootstrap=boot)
>>> primary_avg = m.Average(
>>>     transform="log",
>>>     output_field_name="primary_avg",
>>>     input_field_names=["primary_value"]
>>> )
>>> mvtd = m.MaxValueTimeDelta(input_field_names=["secondary_value"])
>>> pmvt = m.MaxValueTime(input_field_names=["secondary_value"])
>>> include_metrics = [pmvt, mvtd, primary_avg, kge]

Get the currently available fields to use in the query.

>>> flds = eval.joined_timeseries.field_enum()

Define some filters.

>>> filters = [
>>>     JoinedTimeseriesFilter(
>>>         column=flds.primary_location_id,
>>>         operator=ops.eq,
>>>         value="gage-A"
>>>     )
>>> ]

Perform the query, returning the results as a GeoPandas DataFrame.

>>> metrics_df = eval.metrics.query(
>>>     include_metrics=include_metrics,
>>>     group_by=[flds.primary_location_id],
>>>     order_by=[flds.primary_location_id],
>>>     filters=filters,
>>> ).to_geopandas()
to_geopandas() GeoDataFrame[source]#

Convert the DataFrame to a GeoPandas DataFrame.

to_pandas() DataFrame[source]#

Convert the DataFrame to a Pandas DataFrame.

to_sdf() DataFrame[source]#

Return the Spark DataFrame.