LocationAttributeTable#
- class LocationAttributeTable(ev)[source]#
Bases:
BaseTable
Access methods to location attributes table.
Methods
Return distinct values for a column.
Get the location attribute fields enum.
Return table columns as a list.
Apply a filter.
Import location_attributes from CSV file format.
Import data from an in-memory dataframe.
Import location_attributes from parquet file format.
Apply an order_by.
Run a query against the table with filters and order_by.
Return GeoPandas DataFrame.
Return Pandas DataFrame for Location Attributes.
Return PySpark DataFrame.
Validate the dataset table against the schema.
- distinct_values(column: str, location_prefixes: bool = False) List[str] #
Return distinct values for a column.
- Parameters:
column (
str
) – The column to get distinct values for.location_prefixes (
bool
) – Whether to return location prefixes. If True, only the unique prefixes of the locations will be returned. Only compatible with primary_timeseries, secondary_timeseries, joined_timeseries, locations, location_attributes, and location_crosswalk tables and their respective location columns. Default: False
- Returns:
List[str]
– The distinct values for the column.
- fields() List[str] #
Return table columns as a list.
- filter(filters: str | dict | FilterBaseModel | List[str | dict | FilterBaseModel])#
Apply a filter.
- Parameters:
- filters (
Union[
) – str, dict, FilterBaseModel, List[Union[str, dict, FilterBaseModel]]
] The filters to apply to the query. The filters can be an SQL string, dictionary, FilterBaseModel or a list of any of these.
- filters (
- Returns:
self (
BaseTable
orsubclass
ofBaseTable
)
Examples
Note: The filter method is universal for all table types. When repurposing this example, ensure filter arguments (e.g., column names, values) are valid for the specific table type.
Filters as dictionary:
>>> ts_df = ev.primary_timeseries.filter( >>> filters=[ >>> { >>> "column": "value_time", >>> "operator": ">", >>> "value": "2022-01-01", >>> }, >>> { >>> "column": "value_time", >>> "operator": "<", >>> "value": "2022-01-02", >>> }, >>> { >>> "column": "location_id", >>> "operator": "=", >>> "value": "gage-C", >>> }, >>> ] >>> ).to_pandas()
Filters as string:
>>> ts_df = ev.primary_timeseries.filter( >>> filters=[ >>> "value_time > '2022-01-01'", >>> "value_time < '2022-01-02'", >>> "location_id = 'gage-C'" >>> ] >>> ).to_pandas()
Filters as FilterBaseModel:
>>> from teehr.models.filters import TimeseriesFilter >>> from teehr.models.filters import FilterOperators >>> >>> fields = ev.primary_timeseries.field_enum() >>> ts_df = ev.primary_timeseries.filter( >>> filters=[ >>> TimeseriesFilter( >>> column=fields.value_time, >>> operator=FilterOperators.gt, >>> value="2022-01-01", >>> ), >>> TimeseriesFilter( >>> column=fields.value_time, >>> operator=FilterOperators.lt, >>> value="2022-01-02", >>> ), >>> TimeseriesFilter( >>> column=fields.location_id, >>> operator=FilterOperators.eq, >>> value="gage-C", >>> ), >>> ]).to_pandas()
- load_csv(in_path: Path | str, pattern: str = '**/*.csv', field_mapping: dict | None = None, location_id_prefix: str | None = None, write_mode: TableWriteEnum = 'append', drop_duplicates: bool = True, update_attrs_table: bool = True, **kwargs)[source]#
Import location_attributes from CSV file format.
- Parameters:
in_path (
Union[Path
,str]
) – The input file or directory path. CSV file format.field_mapping (
dict
, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}location_id_prefix (
str
, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
TableWriteEnum
,optional (default
:"append"
)
) – The write mode for the table. Options are “append”, “upsert”, and “overwrite”. If “append”, the table will be appended with new data that does already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “overwrite”, existing partitions receiving new data are overwrittendrop_duplicates (
bool
,optional (default
:True)
) – Whether to drop duplicates from the DataFrame.update_attrs_table (
bool
,optional (default
:True)
) – Whether to add default attributes for the location attributes. If True, it will add default attributes for each unique attribute name found in the data with category=”continuous” and the default description “<attribute_name> default description”.**kwargs – Additional keyword arguments are passed to pd.read_parquet().
Notes
The TEEHR Location Attribute table schema includes fields:
location_id
attribute_name
value
- load_dataframe(df: DataFrame | DataFrame, field_mapping: dict | None = None, constant_field_values: dict | None = None, location_id_prefix: str | None = None, write_mode: TableWriteEnum = 'append', persist_dataframe: bool = False, drop_duplicates: bool = True)[source]#
Import data from an in-memory dataframe.
- Parameters:
df (
Union[pd.DataFrame
,ps.DataFrame]
) – DataFrame to load into the table.field_mapping (
dict
, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}constant_field_values (
dict
, optional) – A dictionary mapping field names to constant values. Format: {field_name: value}.location_id_prefix (
str
, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
TableWriteEnum
,optional (default
:"append"
)
) – The write mode for the table. Options are “append”, “upsert”, and “overwrite”. If “append”, the table will be appended with new data that does already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “overwrite”, existing partitions receiving new data are overwritten.persist_dataframe (
bool
,optional (default
:False)
) – Whether to repartition and persist the pyspark dataframe after reading from the cache. This can improve performance when loading a large number of files from the cache.drop_duplicates (
bool
,optional (default
:True)
) – Whether to drop duplicates from the dataframe.
- load_parquet(in_path: Path | str, pattern: str = '**/*.parquet', field_mapping: dict | None = None, location_id_prefix: str | None = None, write_mode: TableWriteEnum = 'append', drop_duplicates: bool = True, update_attrs_table: bool = True, **kwargs)[source]#
Import location_attributes from parquet file format.
- Parameters:
in_path (
Union[Path
,str]
) – The input file or directory path. Parquet file format.field_mapping (
dict
, optional) – A dictionary mapping input fields to output fields. Format: {input_field: output_field}location_id_prefix (
str
, optional) – The prefix to add to location IDs. Used to ensure unique location IDs across configurations. Note, the methods for fetching USGS and NWM data automatically prefix location IDs with “usgs” or the nwm version (“nwm12, “nwm21”, “nwm22”, or “nwm30”), respectively.write_mode (
TableWriteEnum
,optional (default
:"append"
)
) – The write mode for the table. Options are “append”, “upsert”, and “overwrite”. If “append”, the table will be appended with new data that does already exist. If “upsert”, existing data will be replaced and new data that does not exist will be appended. If “overwrite”, existing partitions receiving new data are overwritten.drop_duplicates (
bool
,optional (default
:True)
) – Whether to drop duplicates from the DataFrame.update_attrs_table (
bool
,optional (default
:True)
) – Whether to add default attributes for the location attributes. If True, it will add default attributes for each unique attribute name found in the data with category=”continuous” and the default description “<attribute_name> default description”.**kwargs – Additional keyword arguments are passed to pd.read_parquet().
Notes
The TEEHR Location Attribute table schema includes fields:
location_id
attribute_name
value
- order_by(fields: str | StrEnum | List[str | StrEnum])#
Apply an order_by.
- Parameters:
fields (
Union[str
,StrEnum
,List[Union[str
,StrEnum]]]
) – The fields to order the query by. The fields can be a string, StrEnum or a list of any of these. The fields will be ordered in the order they are provided.- Returns:
self (
BaseTable
orsubclass
ofBaseTable
)
Examples
Order by string:
>>> ts_df = ev.primary_timeseries.order_by("value_time").to_df()
Order by StrEnum:
>>> from teehr.querying.field_enums import TimeseriesFields >>> ts_df = ev.primary_timeseries.order_by( >>> TimeseriesFields.value_time >>> ).to_pandas()
- query(filters: str | dict | FilterBaseModel | List[str | dict | FilterBaseModel] | None = None, order_by: str | StrEnum | List[str | StrEnum] | None = None)#
Run a query against the table with filters and order_by.
In general a user will either use the query methods or the filter and order_by methods. The query method is a convenience method that will apply filters and order_by in a single call.
- Parameters:
- filters (
Union[
) – str, dict, FilterBaseModel, List[Union[str, dict, FilterBaseModel]]
] The filters to apply to the query. The filters can be an SQL string, dictionary, FilterBaseModel or a list of any of these. The filters will be applied in the order they are provided.
- filters (
order_by (
Union[str
,List[str]
,StrEnum
,List[StrEnum]]
) – The fields to order the query by. The fields can be a string, StrEnum or a list of any of these. The fields will be ordered in the order they are provided.
- Returns:
self (
BaseTable
orsubclass
ofBaseTable
)
Examples
Filters as dictionaries:
>>> ts_df = ev.primary_timeseries.query( >>> filters=[ >>> { >>> "column": "value_time", >>> "operator": ">", >>> "value": "2022-01-01", >>> }, >>> { >>> "column": "value_time", >>> "operator": "<", >>> "value": "2022-01-02", >>> }, >>> { >>> "column": "location_id", >>> "operator": "=", >>> "value": "gage-C", >>> }, >>> ], >>> order_by=["location_id", "value_time"] >>> ).to_pandas()
Filters as SQL strings:
>>> ts_df = ev.primary_timeseries.query( >>> filters=[ >>> "value_time > '2022-01-01'", >>> "value_time < '2022-01-02'", >>> "location_id = 'gage-C'" >>> ], >>> order_by=["location_id", "value_time"] >>> ).to_pandas()
Filters as FilterBaseModels:
>>> from teehr.models.filters import TimeseriesFilter >>> from teehr.models.filters import FilterOperators >>> >>> fields = ev.primary_timeseries.field_enum() >>> ts_df = ev.primary_timeseries.query( >>> filters=[ >>> TimeseriesFilter( >>> column=fields.value_time, >>> operator=FilterOperators.gt, >>> value="2022-01-01", >>> ), >>> TimeseriesFilter( >>> column=fields.value_time, >>> operator=FilterOperators.lt, >>> value="2022-01-02", >>> ), >>> TimeseriesFilter( >>> column=fields.location_id, >>> operator=FilterOperators.eq, >>> value="gage-C", >>> ), >>> ]).to_pandas()
- to_sdf()#
Return PySpark DataFrame.
The PySpark DataFrame can be further processed using PySpark. Note, PySpark DataFrames are lazy and will not be executed until an action is called. For example, calling show(), collect() or toPandas(). This can be useful for further processing or analysis, for example,
>>> ts_sdf = ev.primary_timeseries.query( >>> filters=[ >>> "value_time > '2022-01-01'", >>> "value_time < '2022-01-02'", >>> "location_id = 'gage-C'" >>> ] >>> ).to_sdf() >>> ts_df = ( >>> ts_sdf.select("value_time", "location_id", "value") >>> .orderBy("value").toPandas() >>> ) >>> ts_df.head()
- validate()#
Validate the dataset table against the schema.