Validate#

class Validate(ev=None)[source]#

Class for validating data.

Methods

schema

Validate the DataFrame against the provided schema.

schema_and_data

Validate the DataFrame against the table schema.

sdf_filters

Validate and format filter(s) against an existing DataFrame.

table_filters

Validate table filter(s) by reading the table schema.

static schema(df: DataFrame | DataFrame, table_schema: DataFrameSchema | DataFrameSchema) DataFrame | DataFrame[source]#

Validate the DataFrame against the provided schema.

This only checks data types, fields, and nullability using the pandera schema, it does not enforce foreign key relationships.

Parameters:
  • df (ps.DataFrame | pd.DataFrame) – The Spark or Pandas DataFrame to validate.

  • table_schema (SparkDataFrameSchema | PandasDataFrameSchema) – The schema to validate against.

Returns:

ps.DataFrame | pd.DataFrame – The validated Spark or Pandas DataFrame.

Examples

Validate a PySpark DataFrame against the primary timeseries schema:

>>> from teehr.models.pandera_dataframe_schemas import primary_timeseries_schema
>>> validated_sdf = ev.validate.schema(
...     df=raw_sdf,
...     table_schema=primary_timeseries_schema()
... )

For Pandas DataFrames:

>>> validated_pdf = ev.validate.schema(
...     df=raw_pdf,
...     table_schema=primary_timeseries_schema(type="pandas")
... )
schema_and_data(sdf: DataFrame, table_schema: DataFrameSchema, foreign_keys: List[Dict[str, str]], strict: bool = True, add_missing_columns: bool = False, drop_duplicates: bool = True, uniqueness_fields: List[str] | None = None) DataFrame[source]#

Validate the DataFrame against the table schema.

This checks data types, fields, and nullability using the pandera schema, while also enforcing foreign key relationships, optionally dropping duplicates, and optionally adding or removing columns to match the table schema.

Parameters:
  • sdf (ps.DataFrame) – The Spark DataFrame to enforce the schema on.

  • table_schema (SparkDataFrameSchema) – The schema to enforce.

  • foreign_keys (List[Dict[str, str]]) – The foreign key relationships to enforce.

  • strict (bool, optional) – Whether to strictly enforce the schema by including only the columns in the schema. The default is True.

  • add_missing_columns (bool, optional) – Whether to add missing columns from the schema with null values. The default is False.

  • drop_duplicates (bool, optional) – Whether to drop duplicate rows based on the uniqueness_fields. The default is True.

  • uniqueness_fields (List[str], optional) – The fields that uniquely identify a record. Required if drop_duplicates is True. The default is None.

Returns:

ps.DataFrame – The Spark DataFrame with the enforced schema.

sdf_filters(sdf: DataFrame, filters: str | dict | TableFilter | List[str | dict | TableFilter], validate: bool = True) List[str][source]#

Validate and format filter(s) against an existing DataFrame.

This method validates filters against the schema and columns of an in-memory DataFrame, allowing filtering on calculated fields that don’t exist in the warehouse table.

Parameters:
  • sdf (ps.DataFrame) – The Spark DataFrame to validate filters against.

  • filters (Union[) – str, dict, TableFilter, List[Union[str, dict, TableFilter]]

  • ] – The filters to validate.

  • validate (bool, optional) – Whether to validate the filter field types against the schema. The default is True.

Returns:

List[str] – List of validated filter strings ready to apply with sdf.filter().

table_filters(table_name: str, filters: str | dict | TableFilter | List[str | dict | TableFilter], validate: bool = True) List[str][source]#

Validate table filter(s) by reading the table schema.

Parameters:
  • table_name (str) – The name of the table to validate filters for.

  • filters (Union[) – str, dict, TableFilter, List[Union[str, dict, TableFilter]]

  • ] – The filters to validate.

  • validate (bool, optional) – Whether to validate the filter field types against the table schema. The default is True.

Returns:

List[str] – List of validated filter strings ready to apply with sdf.filter().