Write#

class Write(ev=None)[source]#

Class to handle writing evaluation results to storage.

Methods

delete_from

Delete rows from a table based on filter conditions.

to_cache

Write the DataFrame to a parquet file for caching.

to_warehouse

Write the DataFrame to the specified target in the catalog.

delete_from(table_name: str, filters: str | dict | TableFilter | List[str | dict | TableFilter] | None = None, catalog_name: str | None = None, namespace_name: str | None = None, dry_run: bool = False) int | DataFrame[source]#

Delete rows from a table based on filter conditions.

Parameters:
  • table_name (str) – The name of the table to delete rows from.

  • filters (Union[str, dict, TableFilter, List[...]], optional) – Filter conditions specifying which rows to delete. Supports SQL strings, dictionaries, or TableFilter objects. If None, all rows in the table will be deleted.

  • catalog_name (str, optional) – The catalog name, by default None, which uses the active catalog.

  • namespace_name (str, optional) – The namespace name, by default None, which uses the active namespace.

  • dry_run (bool, optional) – If True, returns a Spark DataFrame of rows that would be deleted without performing the actual deletion. This allows the user to inspect or count the rows before committing the delete. Default is False.

Returns:

int or ps.DataFrame – If dry_run=False, returns the number of rows deleted (int). If dry_run=True, returns a Spark DataFrame of rows that would be deleted.

Examples

Preview rows that would be deleted (dry run):

>>> sdf = ev.write.delete_from(
>>>     table_name="primary_timeseries",
>>>     filters=["location_id = 'usgs-01234567'"],
>>>     dry_run=True,
>>> )
>>> sdf.show()
>>> print(f"Rows to delete: {sdf.count()}")

Delete rows and get the count:

>>> count = ev.write.delete_from(
>>>     table_name="primary_timeseries",
>>>     filters=["location_id = 'usgs-01234567'"],
>>> )
>>> print(f"Deleted {count} rows.")

Delete using a TableFilter object:

>>> from teehr.models.filters import TableFilter
>>> from teehr import Operators as ops
>>> count = ev.write.delete_from(
>>>     table_name="primary_timeseries",
>>>     filters=TableFilter(
>>>         column="location_id",
>>>         operator=ops.eq,
>>>         value="usgs-01234567"
>>>     ),
>>> )
static to_cache(source_data: DataFrame | DataFrame | GeoDataFrame, cache_filepath: str | Path, write_schema: Schema, write_mode: str = 'overwrite')[source]#

Write the DataFrame to a parquet file for caching.

Parameters:
  • source_data (ps.DataFrame | pd.DataFrame | gpd.GeoDataFrame) – The Spark, Pandas, or GeoPandas DataFrame to cache.

  • cache_filepath (str) – The path to use for the cached table.

  • write_schema (ArrowSchema) – The pyarrow schema to use when writing the parquet file.

  • write_mode (str, optional) – The mode to use when a PySpark DataFrame is written to the cache using PySpark’s DataFrame.write.mode. Default is “overwrite”.

to_warehouse(source_data: DataFrame | DataFrame | str, table_name: str, write_mode: str = 'append', uniqueness_fields: List[str] | None = None, catalog_name: str | None = None, namespace_name: str | None = None)[source]#

Write the DataFrame to the specified target in the catalog.

Parameters:
  • source_data (pd.DataFrame | ps.DataFrame | str) – The Spark or Pandas DataFrame or temporary view name to write.

  • table_name (str) – The target table name in the catalog.

  • write_mode (str, optional) – The mode to use when writing the DataFrame. Options:

    • "insert": Insert all rows directly without duplicate checking. Faster than "append" but may create duplicates.

    • "append": Insert new rows; skip rows matching uniqueness_fields (uses MERGE INTO).

    • "upsert": Insert new rows; update existing rows matching uniqueness_fields.

    • "overwrite": Replace all data in table. Preserves table structure and history (can time-travel back).

    • "create_or_replace": Drop and recreate table. Loses history. Can change schema.

    Default is "append".

  • uniqueness_fields (List[str], optional) – List of fields that uniquely identify a record, by default None, which means the uniqueness_fields are taken from the table class. Only used for "append" and "upsert" write modes.

  • catalog_name (str, optional) – The catalog name to write to, by default None, which means the catalog_name of the active catalog is used.

  • namespace_name (str, optional) – The namespace name to write to, by default None, which means the namespace_name of the active catalog is used.