teehr.evaluation.tables.primary_timeseries_table.PrimaryTimeseriesTable#

class teehr.evaluation.tables.primary_timeseries_table.PrimaryTimeseriesTable(ev)[source]#

Bases: TimeseriesTable

Access methods to primary timeseries table.

Methods

distinct_values

Return distinct values for a column.

field_enum

Get the timeseries fields enum.

fields

Return table columns as a list.

filter

Apply a filter.

load_csv

Import primary timeseries csv data.

load_fews_xml

Import timeseries from XML data format.

load_netcdf

Import primary timeseries netcdf data.

load_parquet

Import primary timeseries parquet data.

order_by

Apply an order_by.

query

Run a query against the table with filters and order_by.

to_geopandas

Return GeoPandas DataFrame.

to_pandas

Return Pandas DataFrame for Primary Timeseries.

to_sdf

Return PySpark DataFrame.

validate

Validate the dataset table against the schema.

distinct_values(column: str) List[str]#

Return distinct values for a column.

field_enum() TimeseriesFields[source]#

Get the timeseries fields enum.

fields() List[str]#

Return table columns as a list.

filter(filters: str | dict | FilterBaseModel | List[str | dict | FilterBaseModel])#

Apply a filter.

Parameters:
filters (Union[) – str, dict, FilterBaseModel,

List[Union[str, dict, FilterBaseModel]]

] The filters to apply to the query. The filters can be an SQL string, dictionary, FilterBaseModel or a list of any of these.

Returns:

self (BaseTable or subclass of BaseTable)

Examples

Filters as dictionary:

>>> ts_df = ev.primary_timeseries.filter(
>>>     filters=[
>>>         {
>>>             "column": "value_time",
>>>             "operator": ">",
>>>             "value": "2022-01-01",
>>>         },
>>>         {
>>>             "column": "value_time",
>>>             "operator": "<",
>>>             "value": "2022-01-02",
>>>         },
>>>         {
>>>             "column": "location_id",
>>>             "operator": "=",
>>>             "value": "gage-C",
>>>         },
>>>     ]
>>> ).to_pandas()

Filters as string:

>>> ts_df = ev.primary_timeseries.filter(
>>>     filters=[
>>>         "value_time > '2022-01-01'",
>>>         "value_time < '2022-01-02'",
>>>         "location_id = 'gage-C'"
>>>     ]
>>> ).to_pandas()

Filters as FilterBaseModel:

>>> from teehr.models.filters import TimeseriesFilter
>>> from teehr.models.filters import FilterOperators
>>>
>>> fields = ev.primary_timeseries.field_enum()
>>> ts_df = ev.primary_timeseries.filter(
>>>     filters=[
>>>         TimeseriesFilter(
>>>             column=fields.value_time,
>>>             operator=FilterOperators.gt,
>>>             value="2022-01-01",
>>>         ),
>>>         TimeseriesFilter(
>>>             column=fields.value_time,
>>>             operator=FilterOperators.lt,
>>>             value="2022-01-02",
>>>         ),
>>>         TimeseriesFilter(
>>>             column=fields.location_id,
>>>             operator=FilterOperators.eq,
>>>             value="gage-C",
>>>         ),
>>> ]).to_pandas()
load_csv(in_path: Path | str, pattern: str = '**/*.csv', field_mapping: dict | None = None, constant_field_values: dict | None = None, **kwargs)#

Import primary timeseries csv data.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in csv file format.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}

  • **kwargs – Additional keyword arguments are passed to pd.read_csv().

  • Includes validation and importing data to database.

Notes

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

load_fews_xml(in_path: Path | str, pattern: str = '**/*.xml', field_mapping: dict = {'ensembleId': 'configuration_name', 'ensembleMemberIndex': 'member', 'forecastDate': 'reference_time', 'locationId': 'location_id', 'parameterId': 'variable_name', 'units': 'unit_name'}, constant_field_values: dict | None = None)#

Import timeseries from XML data format.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in xml file format.

  • pattern (str, optional (default: "**/*.xml")) – The pattern to match files.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field} Default mapping: {

    “locationId”: “location_id”, “forecastDate”: “reference_time”, “parameterId”: “variable_name”, “units”: “unit_name”, “ensembleId”: “configuration_name”, “ensembleMemberIndex”: “member”, “forecastDate”: “reference_time”

    }

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}.

  • Includes validation and importing data to database.

Notes

This function follows the Delft-FEWS Published Interface (PI) XML format.

reference: https://publicwiki.deltares.nl/display/FEWSDOC/Dynamic+data

The value and value_time fields are parsed automatically.

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

  • member

load_netcdf(in_path: Path | str, pattern: str = '**/*.nc', field_mapping: dict | None = None, constant_field_values: dict | None = None, **kwargs)#

Import primary timeseries netcdf data.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in netcdf file format.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}

  • **kwargs – Additional keyword arguments are passed to xr.open_dataset().

  • Includes validation and importing data to database.

Notes

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

load_parquet(in_path: Path | str, pattern: str = '**/*.parquet', field_mapping: dict | None = None, constant_field_values: dict | None = None, **kwargs)#

Import primary timeseries parquet data.

Parameters:
  • in_path (Union[Path, str]) – Path to the timeseries data (file or directory) in parquet file format.

  • field_mapping (dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}

  • constant_field_values (dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}

  • **kwargs – Additional keyword arguments are passed to pd.read_parquet().

  • Includes validation and importing data to database.

Notes

The TEEHR Timeseries table schema includes fields:

  • reference_time

  • value_time

  • configuration_name

  • unit_name

  • variable_name

  • value

  • location_id

order_by(fields: str | StrEnum | List[str | StrEnum])#

Apply an order_by.

Parameters:

fields (Union[str, StrEnum, List[Union[str, StrEnum]]]) – The fields to order the query by. The fields can be a string, StrEnum or a list of any of these. The fields will be ordered in the order they are provided.

Returns:

self (BaseTable or subclass of BaseTable)

Examples

Order by string:

>>> ts_df = ev.primary_timeseries.order_by("value_time").to_df()

Order by StrEnum:

>>> from teehr.querying.field_enums import TimeseriesFields
>>> ts_df = ev.primary_timeseries.order_by(
>>>     TimeseriesFields.value_time
>>> ).to_pandas()
query(filters: str | dict | FilterBaseModel | List[str | dict | FilterBaseModel] | None = None, order_by: str | StrEnum | List[str | StrEnum] | None = None)#

Run a query against the table with filters and order_by.

In general a user will either use the query methods or the filter and order_by methods. The query method is a convenience method that will apply filters and order_by in a single call.

Parameters:
  • filters (Union[) – str, dict, FilterBaseModel,

    List[Union[str, dict, FilterBaseModel]]

    ] The filters to apply to the query. The filters can be an SQL string, dictionary, FilterBaseModel or a list of any of these. The filters will be applied in the order they are provided.

  • order_by (Union[str, List[str], StrEnum, List[StrEnum]]) – The fields to order the query by. The fields can be a string, StrEnum or a list of any of these. The fields will be ordered in the order they are provided.

Returns:

self (BaseTable or subclass of BaseTable)

Examples

Filters as dictionaries:

>>> ts_df = ev.primary_timeseries.query(
>>>     filters=[
>>>         {
>>>             "column": "value_time",
>>>             "operator": ">",
>>>             "value": "2022-01-01",
>>>         },
>>>         {
>>>             "column": "value_time",
>>>             "operator": "<",
>>>             "value": "2022-01-02",
>>>         },
>>>         {
>>>             "column": "location_id",
>>>             "operator": "=",
>>>             "value": "gage-C",
>>>         },
>>>     ],
>>>     order_by=["location_id", "value_time"]
>>> ).to_pandas()

Filters as SQL strings:

>>> ts_df = ev.primary_timeseries.query(
>>>     filters=[
>>>         "value_time > '2022-01-01'",
>>>         "value_time < '2022-01-02'",
>>>         "location_id = 'gage-C'"
>>>     ],
>>>     order_by=["location_id", "value_time"]
>>> ).to_pandas()

Filters as FilterBaseModels:

>>> from teehr.models.filters import TimeseriesFilter
>>> from teehr.models.filters import FilterOperators
>>>
>>> fields = ev.primary_timeseries.field_enum()
>>> ts_df = ev.primary_timeseries.query(
>>>     filters=[
>>>         TimeseriesFilter(
>>>             column=fields.value_time,
>>>             operator=FilterOperators.gt,
>>>             value="2022-01-01",
>>>         ),
>>>         TimeseriesFilter(
>>>             column=fields.value_time,
>>>             operator=FilterOperators.lt,
>>>             value="2022-01-02",
>>>         ),
>>>         TimeseriesFilter(
>>>             column=fields.location_id,
>>>             operator=FilterOperators.eq,
>>>             value="gage-C",
>>>         ),
>>> ]).to_pandas()
to_geopandas()#

Return GeoPandas DataFrame.

to_pandas()#

Return Pandas DataFrame for Primary Timeseries.

to_sdf()#

Return PySpark DataFrame.

The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called. For example, calling show(), collect() or toPandas(). This can be useful for further processing or analysis, for example,

>>> ts_sdf = ev.primary_timeseries.query(
>>>     filters=[
>>>         "value_time > '2022-01-01'",
>>>         "value_time < '2022-01-02'",
>>>         "location_id = 'gage-C'"
>>>     ]
>>> ).to_sdf()
>>> ts_df = (
>>>     ts_sdf.select("value_time", "location_id", "value")
>>>    .orderBy("value").toPandas()
>>> )
>>> ts_df.head()
validate()#

Validate the dataset table against the schema.