TimeseriesTable#
- class TimeseriesTable(ev, table_name: str | None = None, namespace_name: str | None = None, catalog_name: str | None = None)[source]#
Access methods to timeseries table.
Base class for primary and secondary timeseries tables.
Methods
Add location attributes to the DataFrame.
Add calculated fields to the DataFrame.
Add geometry to the DataFrame by joining with the locations table.
Aggregate data with grouping and metrics.
Delete rows from this table based on filter conditions.
Return distinct values for a column.
Drop this table from the catalog.
Apply filters to the DataFrame.
Import timeseries csv data.
Load data from an in-memory dataframe.
Import timeseries from FEWS PI-XML data format.
Import timeseries netcdf data.
Import timeseries parquet data.
Apply ordering to the DataFrame.
Return GeoPandas DataFrame.
Return Pandas DataFrame.
Return the PySpark DataFrame.
Validate the dataset table against the schema.
Write the DataFrame to an iceberg table.
Attributes
foreign_keysReturn True if this table is a core (built-in) TEEHR table.
primary_location_id_fieldschema_funcsecondary_location_id_fieldstrict_validationtable_nameuniqueness_fieldsvalidate_filter_field_typesextraction_func(in_filepath, field_mapping, ...)Convert timeseries data to a pandas DataFrame.
- add_attributes(attr_list: List[str] | None = None, location_id_col: str | None = None)#
Add location attributes to the DataFrame.
Joins pivoted location attributes to the DataFrame. The join column is auto-detected from common location ID field names (‘location_id’, ‘primary_location_id’) unless specified.
This is especially useful when called after a
aggregate()with GROUP BY and aggregation metrics, so that attributes do not need to be included in thegroup_byclause in order to pass through to the result.- Parameters:
attr_list (
List[str], optional) – Specific attributes to add. If None, all attributes are added.location_id_col (
str, optional) – The column name in the DataFrame to join on. If None, checks for ‘location_id’ then ‘primary_location_id’.
- Returns:
self– Returns self for method chaining.
Examples
Add all attributes:
>>> df = accessor.add_attributes().to_pandas()
Add specific attributes:
>>> df = accessor.add_attributes( ... attr_list=["drainage_area", "ecoregion"] ... ).to_pandas()
Specify join column explicitly:
>>> df = accessor.add_attributes( ... location_id_col="primary_location_id" ... ).to_pandas()
Add attributes after metric aggregation — avoids including them in
group_by:>>> from teehr.metrics import KGE >>> df = ( ... ev.joined_timeseries_view() ... .aggregate( ... group_by=["primary_location_id"], ... metrics=[KGE()] ... ) ... .add_attributes(attr_list=["drainage_area", "ecoregion"]) ... .to_pandas() ... )
- add_calculated_fields(cfs: CalculatedFieldBaseModel | List[CalculatedFieldBaseModel])#
Add calculated fields to the DataFrame.
- Parameters:
cfs (
Union[CalculatedFieldBaseModel,List[...]]) – The calculated fields to add.- Returns:
self– Returns self for method chaining.
Examples
>>> import teehr >>> from teehr import RowLevelCalculatedFields as rcf >>> >>> df = accessor.add_calculated_fields([ >>> rcf.Month() >>> ]).to_pandas()
- add_geometry()#
Add geometry to the DataFrame by joining with the locations table.
- aggregate(group_by: str | List[str], metrics: List[MetricsBasemodel])#
Aggregate data with grouping and metrics.
- Parameters:
group_by (
Union[str,List[str]]) – Fields to group by for metric calculation.metrics (
List[MetricsBasemodel]) – Metrics to calculate.
- Returns:
self– Returns self for method chaining.
Examples
>>> df = accessor.aggregate( >>> metrics=[KGE()], >>> group_by=["primary_location_id"] >>> ).to_pandas()
Chain with filter and order_by:
>>> from teehr import DeterministicMetrics as dm >>> >>> df = ( >>> accessor >>> .filter("primary_location_id LIKE 'usgs%'") >>> .aggregate( >>> group_by=["primary_location_id", "configuration_name"], >>> metrics=[dm.KlingGuptaEfficiency(), dm.RelativeBias()] >>> ) >>> .order_by(["primary_location_id", "configuration_name"]) >>> .to_pandas() >>> )
- delete(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None, dry_run: bool = False) int | DataFrame#
Delete rows from this table based on filter conditions.
Delegates to
Write.delete_from().- Parameters:
filters (
Union[str,dict,TableFilter,List[...]], optional) – Filter conditions specifying which rows to delete. Supports SQL strings, dictionaries, orTableFilterobjects. IfNone, all rows in the table will be deleted.dry_run (
bool, optional) – IfTrue, returns a Spark DataFrame of rows that would be deleted without performing the actual deletion. Default isFalse.
- Returns:
intorps.DataFrame– Ifdry_run=False, returns the number of rows deleted (int). Ifdry_run=True, returns a Spark DataFrame of rows that would be deleted.
Examples
Preview rows that would be deleted (dry run):
>>> sdf = ev.table("primary_timeseries").delete( >>> filters=["location_id = 'usgs-01234567'"], >>> dry_run=True, >>> ) >>> print(f"Rows to delete: {sdf.count()}")
Delete rows and get the count:
>>> count = ev.table("primary_timeseries").delete( >>> filters=["location_id = 'usgs-01234567'"], >>> ) >>> print(f"Deleted {count} rows.")
Delete all rows from this table:
>>> count = ev.primary_timeseries.delete()
- distinct_values(column: str, location_prefixes: bool = False) List[str]#
Return distinct values for a column.
- Parameters:
column (
str) – The column to get distinct values for.location_prefixes (
bool) – Whether to return location prefixes. If True, only the unique prefixes of the locations will be returned. Default: False
- Returns:
List[str]– The distinct values for the column.
Examples
Get distinct location IDs from the primary timeseries table:
>>> ev.table(table_name="primary_timeseries").distinct_values( >>> column='location_id', >>> location_prefixes=False >>> )
Get distinct location prefixes from the joined timeseries table:
>>> ev.table(table_name="joined_timeseries").distinct_values( >>> column='primary_location_id', >>> location_prefixes=True >>> )
- drop()#
Drop this table from the catalog.
Only non-core tables (user-created tables, materialized views, saved query results) can be dropped. Attempting to drop a core table (e.g., primary_timeseries, locations, units) will raise a ValueError.
- Raises:
ValueError – If the table is a core TEEHR table.
Examples
Write and then drop a user-created table:
>>> ev.joined_timeseries_view().write("my_results") >>> ev.table("my_results").drop()
- static extraction_func(in_filepath: str | Path, field_mapping: dict, **kwargs) DataFrame#
Convert timeseries data to a pandas DataFrame.
- Parameters:
in_filepath (
Union[str,Path]) – The input file path.field_mapping (
dict) – A dictionary mapping input fields to output fields. format: {input_field: output_field}**kwargs –
- Additional keyword arguments are passed to
pd.read_csv(), pd.read_parquet(), or xr.open_dataset().
- Returns:
pd.DataFrame
Notes
The input file can be in CSV, Parquet, NetCDF, or XML format. The field_mapping is used to rename the columns in the resulting DataFrame to match the TEEHR data model. The function will read the file, convert it to a DataFrame, rename the columns based on the field_mapping, and return the resulting DataFrame.
- filter(filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None)#
Apply filters to the DataFrame.
- Parameters:
filters (
Union[str,dict,TableFilter,List[...]]) – The filters to apply. Can be SQL strings, dictionaries, or TableFilter objects.- Returns:
self– Returns self for method chaining.
Examples
Filters as dictionary:
>>> df = accessor.filter( >>> filters=[ >>> { >>> "column": "value_time", >>> "operator": ">", >>> "value": "2022-01-01", >>> }, >>> ] >>> ).to_pandas()
Filters as string:
>>> df = accessor.filter( >>> filters=["value_time > '2022-01-01'"] >>> ).to_pandas()
- property is_core_table: bool#
Return True if this table is a core (built-in) TEEHR table.
Core tables (e.g., primary_timeseries, locations, units) are part of the standard TEEHR schema and cannot be dropped. User-created tables (e.g., materialized views or saved query results) are not core tables and can be dropped.
- Returns:
bool– True if the table is a core TEEHR table, False otherwise.
- load_csv(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.csv', field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#
Import timeseries csv data.
- Parameters:
in_path (
Union[Path,str]) – Path to the timeseries data (file or directory) in csv file format.namespace_name (
str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.catalog_name (
str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.extraction_function (
callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.pattern (
str,optional (default:"**/*.csv")) – The pattern to match files. Controls which files are loaded from the directory. If in_path is a file, this parameter is ignored.field_mapping (
dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}constant_field_values (
dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}location_id_prefix (
str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
str,optional (default:"append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.parallel (
bool,optional (default:False)) – Whether to process files in parallel. Default is False.max_workers (
Union[int,None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.drop_duplicates (
bool,optional (default:True)) – Whether to drop duplicates from the DataFrame during validation.**kwargs – Additional keyword arguments are passed to pd.read_csv().
Includes validation and importing data to database.
Notes
The TEEHR Timeseries table schema includes fields:
reference_time
value_time
configuration_name
unit_name
variable_name
value
location_id
- load_dataframe(df: DataFrame | DataFrame, namespace_name: str | None = None, catalog_name: str | None = None, field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', drop_duplicates: bool = True)[source]#
Load data from an in-memory dataframe.
- Parameters:
df (
Union[pd.DataFrame,ps.DataFrame]) – Pandas or PySparkDataFrame to load into the table.namespace_name (
str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.catalog_name (
str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.field_mapping (
dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}constant_field_values (
dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}.location_id_prefix (
str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
str,optional (default:"append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.drop_duplicates (
bool,optional (default:True)) – Whether to drop duplicates from the dataframe.
- load_fews_xml(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.xml', field_mapping: dict = {'ensembleId': 'configuration_name', 'ensembleMemberIndex': 'member', 'forecastDate': 'reference_time', 'locationId': 'location_id', 'parameterId': 'variable_name', 'units': 'unit_name'}, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#
Import timeseries from FEWS PI-XML data format.
- Parameters:
in_path (
Union[Path,str]) – Path to the timeseries data (file or directory) in xml file format.namespace_name (
str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.catalog_name (
str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.extraction_function (
callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.pattern (
str,optional (default:"**/*.xml")) – The pattern to match files. Controls which files are loaded from the directory. If in_path is a file, this parameter is ignored.field_mapping (
dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field} Default mapping:{ "locationId": "location_id", "forecastDate": "reference_time", "parameterId": "variable_name", "units": "unit_name", "ensembleId": "configuration_name", "ensembleMemberIndex": "member", "forecastDate": "reference_time" }
constant_field_values (
dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}.location_id_prefix (
str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
str,optional (default:"append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.parallel (
bool,optional (default:False)) – Whether to process files in parallel. Default is False.max_workers (
Union[int,None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.drop_duplicates (
bool,optional (default:True)) – Whether to drop duplicates from the dataframe.Includes validation and importing data to database.
Notes
This function follows the Delft-FEWS Published Interface (PI) XML format.
reference: https://publicwiki.deltares.nl/display/FEWSDOC/Dynamic+data
The
valueandvalue_timefields are parsed automatically.The TEEHR Timeseries table schema includes fields:
reference_time
value_time
configuration_name
unit_name
variable_name
value
location_id
member
- load_netcdf(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.nc', field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#
Import timeseries netcdf data.
- Parameters:
in_path (
Union[Path,str]) – Path to the timeseries data (file or directory) in netcdf file format.namespace_name (
str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.catalog_name (
str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.extraction_function (
callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.pattern (
str,optional (default:"**/*.nc")) – The pattern to match files. Controls which files are loaded from the directory. If in_path is a file, this parameter is ignored.field_mapping (
dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}constant_field_values (
dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}location_id_prefix (
str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
str,optional (default:"append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.parallel (
bool,optional (default:False)) – Whether to process files in parallel. Default is False.max_workers (
Union[int,None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.drop_duplicates (
bool,optional (default:True)) – Whether to drop duplicates from the DataFrame during validation.**kwargs – Additional keyword arguments are passed to xr.open_dataset().
Includes validation and importing data to database.
Notes
The TEEHR Timeseries table schema includes fields:
reference_time
value_time
configuration_name
unit_name
variable_name
value
location_id
- load_parquet(in_path: Path | str, namespace_name: str | None = None, catalog_name: str | None = None, extraction_function: callable | None = None, pattern: str = '**/*.parquet', field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: str = 'append', parallel: bool = False, max_workers: int | None = 3, drop_duplicates: bool = True, **kwargs)[source]#
Import timeseries parquet data.
- Parameters:
in_path (
Union[Path,str]) – Path to the timeseries data (file or directory) in parquet file format.namespace_name (
str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.catalog_name (
str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.extraction_function (
callable, optional) – A custom function to extract and transform the data from the input files to the TEEHR data model. If None (default), uses the table’s default extraction function.pattern (
str, optional) – The glob pattern to use when searching for files in a directory. Default is ‘**/*.parquet’ to search for all parquet files recursively.field_mapping (
dict, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}constant_field_values (
dict, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}location_id_prefix (
str, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
str,optional (default:"append")) – The write mode for the table. Options are “append”, “upsert”, and “create_or_replace”. If “append”, the table will be appended without checking existing data. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “create_or_replace”, a new table will be created or an existing table will be replaced.parallel (
bool,optional (default:False)) – Whether to process files in parallel. Default is False.max_workers (
Union[int,None], optional) – The maximum number of workers to use for parallel processing when in_path is a directory. This gets passed to the concurrent.futures ProcessPoolExecutor. If in_path is a file, this parameter is ignored. The default value is max(os.cpu_count() - 1, 1). If None, os.process_cpu_count() is used.drop_duplicates (
bool,optional (default:True)) – Whether to drop duplicates from the DataFrame during validation.**kwargs – Additional keyword arguments are passed to pd.read_parquet().
Includes validation and importing data to database.
Notes
The TEEHR Timeseries table schema includes fields:
reference_time
value_time
configuration_name
unit_name
variable_name
value
location_id
- order_by(fields: str | StrEnum | List[str | StrEnum])#
Apply ordering to the DataFrame.
- Parameters:
fields (
Union[str,StrEnum,List[...]]) – The fields to order by.- Returns:
self– Returns self for method chaining.
Examples
>>> df = accessor.order_by("value_time").to_pandas()
- to_geopandas()#
Return GeoPandas DataFrame.
- Returns:
gpd.GeoDataFrame– The data as a GeoPandas DataFrame.
- to_pandas()#
Return Pandas DataFrame.
- Returns:
pd.DataFrame– The data as a Pandas DataFrame.
- to_sdf() DataFrame#
Return the PySpark DataFrame.
The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called (e.g., show(), collect(), toPandas()).
- Returns:
ps.DataFrame– The Spark DataFrame.
- validate(drop_duplicates: bool = True)#
Validate the dataset table against the schema.
- Parameters:
drop_duplicates (
bool, optional) – Whether to drop duplicates based on the uniqueness fields. Default is True.
Examples
Validate a table:
>>> ev.table( >>> table_name="primary_timeseries" >>> ).validate(drop_duplicates=True)
- write(table_name: str, write_mode: str = 'create_or_replace')#
Write the DataFrame to an iceberg table.
- Parameters:
table_name (
str) – The name of the table to write to.write_mode (
str, optional) – The write mode. Options: “create”, “append”, “overwrite”, “create_or_replace”. Default is “create_or_replace”.
- Returns:
self– Returns self for method chaining.
Examples
>>> accessor.aggregate( ... metrics=[KGE()], ... group_by=["primary_location_id"] ... ).write("location_metrics")