Skip to content

Commit

Permalink
validate configs before loading into cache (#170)
Browse files Browse the repository at this point in the history
  • Loading branch information
thinhha authored May 5, 2022
1 parent cf40b38 commit e859d87
Show file tree
Hide file tree
Showing 7 changed files with 115 additions and 21 deletions.
18 changes: 18 additions & 0 deletions clouddq/classes/dq_configs_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ def load_all_rule_bindings_collection(self, rule_binding_collection: dict) -> No
)
rule_bindings_rows = unnest_object_to_list(rule_binding_collection)
for record in rule_bindings_rows:
try:
dq_rule_binding.DqRuleBinding.from_dict(
rule_binding_id=record["id"], kwargs=record, validate_uri=False
)
except Exception as e:
raise ValueError(f"Failed to parse Rule Binding with error:\n{e}\n")
if "entity_uri" not in record:
record.update({"entity_uri": None})
self._cache_db["rule_bindings"].upsert_all(
Expand All @@ -200,6 +206,13 @@ def load_all_row_filters_collection(self, row_filters_collection: dict) -> None:
logger.debug(
f"Loading 'row_filters' configs into cache:\n{pformat(row_filters_collection.keys())}"
)
for row_filter_id, row_filter_record in row_filters_collection.items():
try:
dq_row_filter.DqRowFilter.from_dict(
row_filter_id=row_filter_id, kwargs=row_filter_record
)
except Exception as e:
raise ValueError(f"Failed to parse Row Filter with error:\n{e}\n")
self._cache_db["row_filters"].upsert_all(
unnest_object_to_list(row_filters_collection), pk="id", alter=True
)
Expand All @@ -208,6 +221,11 @@ def load_all_rules_collection(self, rules_collection: dict) -> None:
logger.debug(
f"Loading 'rules' configs into cache:\n{pformat(rules_collection.keys())}"
)
for rules_id, rules_record in rules_collection.items():
try:
dq_rule.DqRule.from_dict(rule_id=rules_id, kwargs=rules_record)
except Exception as e:
raise ValueError(f"Failed to parse Rule with error:\n{e}\n")
self._cache_db["rules"].upsert_all(
unnest_object_to_list(rules_collection), pk="id", alter=True
)
Expand Down
8 changes: 6 additions & 2 deletions clouddq/classes/dq_entity_uri.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,10 @@ def get_configs(self: EntityUri, configs_key: str) -> typing.Any:

@classmethod
def from_uri(
cls: EntityUri, uri_string: str, default_configs: dict | None = None
cls: EntityUri,
uri_string: str,
default_configs: dict | None = None,
validate_uri: bool = True,
) -> EntityUri:
if "://" not in uri_string:
raise ValueError(
Expand All @@ -79,7 +82,8 @@ def from_uri(
uri_configs_string=uri_configs_string,
default_configs=default_scheme_configs,
)
entity_uri.validate()
if validate_uri:
entity_uri.validate()
return entity_uri

def to_dict(self: EntityUri) -> dict:
Expand Down
5 changes: 4 additions & 1 deletion clouddq/classes/dq_rule_binding.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def from_dict(
rule_binding_id: str,
kwargs: dict,
default_configs: dict | None = None,
validate_uri: bool = True,
) -> DqRuleBinding:
"""
Expand All @@ -76,7 +77,9 @@ def from_dict(
entity_uri = None
if "entity_uri" in entity_config:
parsed_entity_uri = EntityUri.from_uri(
entity_config["entity_uri"], default_configs=default_configs
entity_config["entity_uri"],
default_configs=default_configs,
validate_uri=validate_uri,
)
entity_id = parsed_entity_uri.get_entity_id()
entity_uri = parsed_entity_uri
Expand Down
30 changes: 16 additions & 14 deletions clouddq/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
# limitations under the License.

"""todo: add lib docstring."""
from __future__ import annotations

from pathlib import Path
from pprint import pformat

Expand All @@ -35,7 +37,7 @@
logger = logging.getLogger(__name__)


def load_configs(configs_path: Path, configs_type: DqConfigType) -> typing.Dict:
def load_configs(configs_path: Path, configs_type: DqConfigType) -> dict:

if configs_path.is_file():
yaml_files = [configs_path]
Expand All @@ -60,23 +62,23 @@ def load_configs(configs_path: Path, configs_type: DqConfigType) -> typing.Dict:
return all_configs


def load_rule_bindings_config(configs_path: Path) -> typing.Dict:
def load_rule_bindings_config(configs_path: Path) -> dict:
return load_configs(configs_path, DqConfigType.RULE_BINDINGS)


def load_rule_dimensions_config(configs_path: Path) -> list:
return load_configs(configs_path, DqConfigType.RULE_DIMENSIONS)


def load_entities_config(configs_path: Path) -> typing.Dict:
def load_entities_config(configs_path: Path) -> dict:
return load_configs(configs_path, DqConfigType.ENTITIES)


def load_rules_config(configs_path: Path) -> typing.Dict:
def load_rules_config(configs_path: Path) -> dict:
return load_configs(configs_path, DqConfigType.RULES)


def load_row_filters_config(configs_path: Path) -> typing.Dict:
def load_row_filters_config(configs_path: Path) -> dict:
return load_configs(configs_path, DqConfigType.ROW_FILTERS)


Expand All @@ -93,15 +95,15 @@ def load_metadata_registry_default_configs(

def create_rule_binding_view_model(
rule_binding_id: str,
rule_binding_configs: typing.Dict,
rule_binding_configs: dict,
dq_summary_table_name: str,
environment: str,
configs_cache: DqConfigsCache,
dq_summary_table_exists: bool = False,
metadata: typing.Optional[typing.Dict] = None,
metadata: dict | None = None,
debug: bool = False,
progress_watermark: bool = True,
default_configs: typing.Optional[typing.Dict] = None,
default_configs: dict | None = None,
) -> str:
template = load_jinja_template(
template_path=Path("dbt", "macros", "create_rule_binding_view.sql")
Expand Down Expand Up @@ -162,22 +164,22 @@ def write_sql_string_as_dbt_model(

def prepare_configs_from_rule_binding_id(
rule_binding_id: str,
rule_binding_configs: typing.Dict,
rule_binding_configs: dict,
dq_summary_table_name: str,
environment: typing.Optional[str],
environment: str | None,
configs_cache: DqConfigsCache,
dq_summary_table_exists: bool = False,
metadata: typing.Optional[typing.Dict] = None,
metadata: dict | None = None,
progress_watermark: bool = True,
default_configs: typing.Optional[typing.Dict] = None,
) -> typing.Dict:
default_configs: dict | None = None,
) -> dict:
rule_binding = DqRuleBinding.from_dict(
rule_binding_id, rule_binding_configs, default_configs
)
resolved_rule_binding_configs = rule_binding.resolve_all_configs_to_dict(
configs_cache=configs_cache,
)
configs: typing.Dict[typing.Any, typing.Any] = {
configs: dict[typing.Any, typing.Any] = {
"configs": dict(resolved_rule_binding_configs)
}
if environment:
Expand Down
9 changes: 5 additions & 4 deletions clouddq/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""todo: add utils docstring."""
from inspect import getsourcefile
from pathlib import Path
from pprint import pformat

import contextlib
import hashlib
Expand Down Expand Up @@ -139,8 +140,8 @@ def get_from_dict_and_assert(
)
if assertion and not assertion(value):
raise ValueError(
f"Assertion failed on value {value}.\n"
f"Config ID: {config_id}, kwargs: {kwargs}.\n"
f"Assertion failed on value '{value}'.\n"
f"Config ID: {config_id}, Config arguments:\n{pformat(kwargs)}.\n"
f"Error: {error_msg}"
)
return value
Expand All @@ -160,8 +161,8 @@ def get_keys_from_dict_and_assert_oneof(
)
if assertion and not assertion(value):
raise ValueError(
f"Assertion failed on value {value}.\n"
f"Config ID: {config_id}, kwargs: {kwargs}.\n"
f"Assertion failed on value '{value}'.\n"
f"Config ID: {config_id}, Config arguments:\n{pformat(kwargs)}.\n"
f"Error: {error_msg}"
)
return value
Expand Down
34 changes: 34 additions & 0 deletions tests/resources/configs_invalid.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

metadata_registry_defaults:
dataplex:
projects: <my-gcp-project-id>
locations: <my-gcp-dataplex-region-id>
lakes: <my-gcp-dataplex-lake-id>
zones: <my-gcp-dataplex-zone-id>
row_filters:
NONE:
filter_sql_expr: 'True'
rule_bindings:
4209174c-2568-49d2-8d6d-7abeae4435fc:
entity_uri: dataplex://projects/<my-gcp-project-id>/locations/<my-gcp-dataplex-region-id>/lakes/<my-gcp-dataplex-lake-id>/zones/<my-gcp-dataplex-zone-id>/entities/contact_details
column_id: VALUE
row_filter_id: NONE
rule_ids: NOT_NULL_SIMPLE
metadata:
brand: one
rules:
NOT_NULL_SIMPLE:
rule_type: NOT_NULL
32 changes: 32 additions & 0 deletions tests/unit/test_cli_unit.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,38 @@ def test_cli_dry_run_100_rules(
finally:
shutil.rmtree(temp_dir)

def test_cli_dry_run_invalid_configs_fail(
self,
runner,
tmp_path,
gcp_project_id,
gcp_bq_region,
gcp_bq_dataset
):
try:
temp_dir = Path(tmp_path).joinpath("clouddq_test_cli_dry_run_invalid_configs_fail")
temp_dir.mkdir(parents=True)
configs_invalid = Path("tests").joinpath("resources", "configs_invalid.yml").absolute()
with working_directory(temp_dir):
args = [
"ALL",
f"{configs_invalid}",
f"--gcp_project_id={gcp_project_id}",
f"--gcp_bq_dataset_id={gcp_bq_dataset}",
f"--gcp_region_id={gcp_bq_region}",
"--dry_run",
"--debug",
"--skip_sql_validation"
]
result = runner.invoke(main, args)
logger.info(result.output)
assert result.exit_code == 1
error_message = (
"must have defined value 'rule_ids' of type 'list'."
)
assert error_message in result.output
finally:
shutil.rmtree(temp_dir)

if __name__ == "__main__":
raise SystemExit(pytest.main([__file__, '-vv', '-rP', '-n', 'auto']))

0 comments on commit e859d87

Please sign in to comment.