Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] row_condition #10957

Open
victorgrcp opened this issue Feb 20, 2025 · 1 comment
Open

[BUG] row_condition #10957

victorgrcp opened this issue Feb 20, 2025 · 1 comment

Comments

@victorgrcp
Copy link

Describe the bug
Hi! I have a problem when trying to use row_condition. No matter how I write the condition I always get an error.

I'm using GX 1.3.3 in Databricks (14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)) with Python 3.10.12

The data asset is a simple pyspark DF:
df = spark.createDataFrame([("accepted", "-1", 1), ("rejected", "-1", 2), ("new", "-1", 3), ("dismissed", "-1", 4), ("canceled", "1", 5)], ["status_code", "offer_id", "integration_id"])

Parameters:

import os
os.environ["GX_ANALYTICS_ENABLED"] = "false"
from json import loads
import great_expectations as ge
import importlib

catalog = "dbw_gt_data_uat_lake_catalog"
schema  = "js_gold"
table_name = "gld_fact_jobseeker_offer"
column = "status_code"
table_key_list = ["integration_id"] # List of PK columns
expectation_name = "ExpectColumnValuesToBeInSet"
args = {"column": "status_code", "value_set": ["accepted", "canceled", "new", "rejected", "dismissed"]}
condition_parser = "spark"
row_condition = 'col("integration_id")==1'
args.update({"row_condition": row_condition}, **{"condition_parser": condition_parser})

The functions I'm using to create the context and test the expectation:

def my_import(name: str, arguments: dict) -> object:
    """Import a specific Expectation Class from GX Expectation module"""
    module = importlib.import_module("great_expectations.expectations")
    return getattr(module, name)(**arguments)
def test_expectation(df = None):
    context = ge.get_context()
    data_name = f"{catalog}.{schema}.{table_name}"
    data_source = context.data_sources.add_spark(name=data_name)
    data_asset = data_source.add_dataframe_asset(name=table_name)
    
    if not df:
        batch_parameters = {
            "dataframe": 
                spark.read.table(f"{catalog}.{schema}.{table_name}").limit(100)
            }
    else:
        batch_parameters = {"dataframe": df}
    
    expectation_suite = context.suites.add(ge.core.expectation_suite.ExpectationSuite(name=f"s-{data_name}"))
    
    # Get the expectation class
    gx_expectation = my_import(expectation_name, args)
    expectation_suite.add_expectation(gx_expectation)
        
    batch_definition = data_asset.add_batch_definition_whole_dataframe(
        name="batch_definition"
    )
    validation_definition = context.validation_definitions.add(
            ge.core.validation_definition.ValidationDefinition(
                name=f"vd-{data_name}",
                data=batch_definition,
                suite=expectation_suite,
            )
    )
    checkpoint = context.checkpoints.add(
        ge.Checkpoint(
            name=f"cp-{data_name}",
            validation_definitions=[validation_definition],
            result_format={
                "result_format": "COMPLETE",
                "unexpected_index_column_names": table_key_list,
                "partial_unexpected_count": 0,
                "exclude_unexpected_values": False,
                "include_unexpected_rows": True,
                "return_unexpected_index_query": False,
            },
        )
    )
    return checkpoint.run(batch_parameters=batch_parameters)

test_expectation(df)

The exception message is:

"exception_message": "[UNRESOLVED_ROUTINE] Cannot resolve function col on search path [system.builtin, system.session, dbw_gt_data_uat_lake_catalog.default]. SQLSTATE: 42883; line 1 pos 0\n\nJVM stack..."

@victorgrcp
Copy link
Author

When using condition_parser="great_expectations" as the documentation says. I get:

"exception_message": "[CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "status_code". It's probably because of illegal references like df1.select(df2.col(\"a\")). SQLSTATE: 42704\n\nJVM stacktrace:\norg.apache.spark.sql.AnalysisException\n\tat org.apache.spark.sql.errors.QueryCompilationErrors$.cannotResolveDataFrameColumn(QueryCompilationErrors.scala:4823)\n\tat org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveDataFrameColumn(ColumnResolutionHelper.scala:598)\n\tat...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant