Skip to content

Commit

Permalink
add create_temp_table support (#129)
Browse files Browse the repository at this point in the history
* #112: add create_temp_table support

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Update great_expectations_provider/operators/great_expectations.py

Co-authored-by: Wei Lee <[email protected]>

* Update great_expectations_provider/operators/great_expectations.py

Co-authored-by: Wei Lee <[email protected]>

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
Co-authored-by: Pankaj Singh <[email protected]>
Co-authored-by: Wei Lee <[email protected]>
  • Loading branch information
4 people authored Feb 2, 2024
1 parent b0f9880 commit 635314e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 0 deletions.
6 changes: 6 additions & 0 deletions great_expectations_provider/operators/great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,11 +381,14 @@ def build_snowflake_connection_config_from_hook(self) -> Dict[str, str]:
def build_configured_sql_datasource_config_from_conn_id(
self,
) -> Datasource:
create_temp_table = self.conn.extra_dejson.get("create_temp_table", True)

datasource_config = {
"name": f"{self.conn.conn_id}_configured_sql_datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": create_temp_table,
**self.make_connection_configuration(),
},
"data_connectors": {
Expand Down Expand Up @@ -416,11 +419,14 @@ def build_configured_sql_datasource_batch_request(self):
def build_runtime_sql_datasource_config_from_conn_id(
self,
) -> Datasource:
create_temp_table = self.conn.extra_dejson.get("create_temp_table", True)

datasource_config = {
"name": f"{self.conn.conn_id}_runtime_sql_datasource",
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": create_temp_table,
**self.make_connection_configuration(),
},
"data_connectors": {
Expand Down
62 changes: 62 additions & 0 deletions tests/operators/test_great_expectations.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ def constructed_sql_runtime_datasource():
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
"connection_string": "sqlite:///host",
},
"data_connectors": {
Expand All @@ -201,6 +202,7 @@ def constructed_sql_configured_datasource():
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
"connection_string": "sqlite:///host",
},
"data_connectors": {
Expand All @@ -223,6 +225,7 @@ def constructed_sql_configured_datasource():
@pytest.fixture()
def mock_airflow_conn():
conn = mock.Mock(conn_id="sqlite_conn", schema="my_schema", host="host", conn_type="sqlite")
conn.extra_dejson.get.return_value = True
return conn


Expand Down Expand Up @@ -1076,6 +1079,7 @@ def test_great_expectations_operator__build_configured_sql_datasource_config_fro
"execution_engine": {
"module_name": "great_expectations.execution_engine",
"class_name": "SqlAlchemyExecutionEngine",
"create_temp_table": True,
**test_conn_conf,
},
"data_connectors": {
Expand Down Expand Up @@ -1119,6 +1123,64 @@ def test_great_expectations_operator__build_configured_sql_datasource_config_fro
assert constructed_datasource.config == datasource_config


def test_great_expectations_operator__build_configured_sql_datasource_config_from_conn_id_uses_extra_create_temp_table(
constructed_sql_configured_datasource,
):
constructed_sql_configured_datasource["execution_engine"]["create_temp_table"] = False

operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="default_schema.my_sqlite_table",
conn_id="sqlite_conn",
expectation_suite_name="suite",
schema="my_schema",
)
operator.conn = Connection(
conn_id="sqlite_conn",
conn_type="sqlite",
host="host",
login="user",
password="password",
schema="schema",
extra={"create_temp_table": False},
)

constructed_datasource = operator.build_configured_sql_datasource_config_from_conn_id()

assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == constructed_sql_configured_datasource


def test_great_expectations_operator__build_runtime_sql_datasource_config_from_conn_id_uses_extra_create_temp_table(
constructed_sql_runtime_datasource,
):
constructed_sql_runtime_datasource["execution_engine"]["create_temp_table"] = False

operator = GreatExpectationsOperator(
task_id="task_id",
data_context_config=in_memory_data_context_config,
data_asset_name="default_schema.my_sqlite_table",
conn_id="sqlite_conn",
expectation_suite_name="suite",
schema="my_schema",
)
operator.conn = Connection(
conn_id="sqlite_conn",
conn_type="sqlite",
host="host",
login="user",
password="password",
schema="schema",
extra={"create_temp_table": False},
)

constructed_datasource = operator.build_runtime_sql_datasource_config_from_conn_id()

assert isinstance(constructed_datasource, Datasource)
assert constructed_datasource.config == constructed_sql_runtime_datasource


def test_great_expectations_operator__make_connection_string_raise_error():
operator = GreatExpectationsOperator(
task_id="task_id",
Expand Down

0 comments on commit 635314e

Please sign in to comment.