Skip to content

Commit

Permalink
fix bug with bq client (#227)
Browse files Browse the repository at this point in the history
* remove optional project_id argument from BigQuery client

* remove refs from tests and lint

* remove refs

* fix bq integration test

* fix impersonation workflow and bq client dataset check
  • Loading branch information
thinhha authored Feb 8, 2023
1 parent 4d0888c commit 1a5717d
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 13 deletions.
25 changes: 12 additions & 13 deletions clouddq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ def main( # noqa: C901
gcp_project_id=gcp_project_id,
gcp_bq_dataset_id=gcp_bq_dataset_id,
gcp_region_id=gcp_region_id,
gcp_impersonation_credentials=gcp_impersonation_credentials,
bigquery_client=bigquery_client,
)

Expand All @@ -316,18 +317,17 @@ def main( # noqa: C901
)
dq_summary_table_exists = False
dq_summary_table_ref = bigquery_client.table_from_string(dq_summary_table_name)
dq_summary_project_id = dq_summary_table_ref.project
dq_summary_dataset = dq_summary_table_ref.dataset_id
logger.info(
f"Using dq_summary_dataset: {dq_summary_project_id}.{dq_summary_dataset}"
dq_summary_dataset = (
f"{dq_summary_table_ref.project}.{dq_summary_table_ref.dataset_id}"
)
logger.info(f"Using dq_summary_dataset: {dq_summary_dataset}")
dq_summary_table_exists = bigquery_client.is_table_exists(
table=dq_summary_table_name
)
if not bigquery_client.is_dataset_exists(dataset=dq_summary_dataset):
raise AssertionError(
"Invalid argument to --gcp_bq_dataset_id: "
f"Dataset {dq_summary_project_id}.{dq_summary_dataset} does not exist. "
f"Dataset {dq_summary_dataset} does not exist. "
)
dq_summary_dataset_region = bigquery_client.get_dataset_region(
dataset=dq_summary_dataset,
Expand All @@ -336,7 +336,7 @@ def main( # noqa: C901
raise AssertionError(
f"GCP region in --gcp_region_id '{gcp_region_id}' "
f"must be the same as dq_summary_dataset "
f"'{dq_summary_project_id}.{dq_summary_dataset}' region: "
f"'{dq_summary_dataset}' region: "
f"'{dq_summary_dataset_region}'."
)
dq_summary_missing_required_fields = (
Expand All @@ -356,26 +356,25 @@ def main( # noqa: C901
target_table_ref = bigquery_client.table_from_string(
target_bigquery_summary_table
)
target_project_id = target_table_ref.project
target_dataset_id = target_table_ref.dataset_id
target_dataset = f"{target_table_ref.project}.{target_table_ref.dataset_id}"
logger.debug(
f"BigQuery dataset used in --target_bigquery_summary_table: "
f"{target_project_id}.{target_dataset_id}"
f"{target_dataset}"
)
if not bigquery_client.is_dataset_exists(dataset=target_dataset_id):
if not bigquery_client.is_dataset_exists(dataset=target_dataset):
raise AssertionError(
"Invalid argument to --target_bigquery_summary_table: "
f"{target_bigquery_summary_table}. "
f"Dataset {target_project_id}.{target_dataset_id} does not exist. "
f"Dataset {target_dataset} does not exist. "
)
target_dataset_region = bigquery_client.get_dataset_region(
dataset=target_dataset_id
dataset=target_dataset
)
if gcp_region_id and target_dataset_region != gcp_region_id:
raise AssertionError(
f"GCP region in --gcp_region_id '{gcp_region_id}' "
f"must be the same as --target_bigquery_summary_table "
f"'{target_project_id}.{target_dataset_id}' region "
f"'{target_dataset}' region "
f"'{target_dataset_region}'."
)
if target_dataset_region != dq_summary_dataset_region:
Expand Down
2 changes: 2 additions & 0 deletions clouddq/runners/dbt/dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def get_dbt_profiles_dir_and_environment_target(self,
gcp_bq_dataset_id: str,
gcp_region_id: Optional[str] = None,
bigquery_client: Optional[BigQueryClient] = None,
gcp_impersonation_credentials: Optional[str] = None,
) -> typing.Tuple:
self._resolve_connection_configs(
gcp_project_id=gcp_project_id,
Expand All @@ -135,6 +136,7 @@ def get_dbt_profiles_dir_and_environment_target(self,
bigquery_client=bigquery_client,
environment_target=self.environment_target,
num_threads=self.num_threads,
gcp_impersonation_credentials=gcp_impersonation_credentials,
)

return (
Expand Down

0 comments on commit 1a5717d

Please sign in to comment.