Skip to content

Commit e9f14b7

Browse files
committed
Update bg handler
1 parent 3a2c848 commit e9f14b7

File tree

3 files changed

+152
-20
lines changed

3 files changed

+152
-20
lines changed
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
2+
import logging
3+
import time
4+
import json
5+
import ast
6+
from datetime import date, datetime
7+
8+
# Local imports
9+
from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv
10+
11+
# Third-party library imports
12+
from airflow import DAG
13+
from airflow.exceptions import AirflowException
14+
15+
from airflow.models import Variable
16+
from airflow.operators.python_operator import PythonOperator
17+
from airflow.utils.dates import days_ago
18+
19+
20+
args = {
21+
'start_date': days_ago(0),
22+
'params': {
23+
"resource": {
24+
"path": "path/to/my.csv",
25+
"format": "CSV",
26+
"ckan_resource_id": "res-id-123",
27+
"schema": {
28+
"fields": "['field1', 'field2']"
29+
}
30+
},
31+
"ckan_config": {
32+
"api_key": "API_KEY",
33+
"site_url": "URL",
34+
},
35+
"big_query": {
36+
"bq_project_id": "bigquery_project_id",
37+
"bq_dataset_id": "bigquery_dataset_id"
38+
},
39+
"output_bucket": str(date.today())
40+
}
41+
}
42+
43+
dag = DAG(
44+
dag_id='ckan_api_import_to_bq',
45+
default_args=args,
46+
schedule_interval=None
47+
)
48+
49+
def task_import_resource_to_bq(**context):
50+
logging.info('Invoking import resource to bigquery')
51+
logging.info("resource: {}".format(context['params'].get('resource', {})))
52+
53+
gc_file_url = context['params'].get('big_query', {}).get('gcs_uri')
54+
bq_project_id = context['params'].get('big_query', {}).get('bq_project_id')
55+
bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id')
56+
bq_table_name = context['params'].get('big_query', {}).get('bq_table_name')
57+
logging.info("bq_table_name: {}".format(bq_table_name))
58+
59+
raw_schema = context['params'].get('resource', {}).get('schema')
60+
eval_schema = json.loads(raw_schema)
61+
eval_schema = ast.literal_eval(eval_schema)
62+
schema = eval_schema.get('fields')
63+
logging.info("SCHEMA: {}".format(schema))
64+
65+
# sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test"
66+
bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name)
67+
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
68+
ckan_conf = context['params'].get('ckan_config', {})
69+
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
70+
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
71+
72+
import_resource_to_bq_task = PythonOperator(
73+
task_id='import_resource_to_bq',
74+
provide_context=True,
75+
python_callable=task_import_resource_to_bq,
76+
dag=dag,
77+
)

aircan/dags/api_ckan_import_to_bq.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@
66
from datetime import date, datetime
77

88
# Local imports
9-
from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv
9+
from aircan.dependencies.google_cloud.bigquery_handler_v2 import bq_import_csv
10+
from aircan.dependencies.utils import aircan_status_update_nhs as aircan_status_update
1011

1112
# Third-party library imports
1213
from airflow import DAG
@@ -15,7 +16,7 @@
1516
from airflow.models import Variable
1617
from airflow.operators.python_operator import PythonOperator
1718
from airflow.utils.dates import days_ago
18-
19+
import traceback
1920

2021
args = {
2122
'start_date': days_ago(0),
@@ -41,12 +42,14 @@
4142
}
4243

4344
dag = DAG(
44-
dag_id='ckan_api_import_to_bq',
45+
dag_id='ckan_api_import_to_bq_v2',
4546
default_args=args,
4647
schedule_interval=None
4748
)
4849

4950
def task_import_resource_to_bq(**context):
51+
ckan_api_key = context['params'].get('ckan_config', {}).get('api_key')
52+
ckan_site_url = context['params'].get('ckan_config', {}).get('site_url')
5053
logging.info('Invoking import resource to bigquery')
5154
logging.info("resource: {}".format(context['params'].get('resource', {})))
5255

@@ -58,7 +61,8 @@ def task_import_resource_to_bq(**context):
5861

5962
raw_schema = context['params'].get('resource', {}).get('schema')
6063
eval_schema = json.loads(raw_schema)
61-
eval_schema = ast.literal_eval(eval_schema)
64+
if isinstance(eval_schema, str):
65+
eval_schema = ast.literal_eval(eval_schema)
6266
schema = eval_schema.get('fields')
6367
logging.info("SCHEMA: {}".format(schema))
6468

@@ -67,11 +71,23 @@ def task_import_resource_to_bq(**context):
6771
logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id))
6872
ckan_conf = context['params'].get('ckan_config', {})
6973
ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id')
74+
dag_run_id = context['run_id']
75+
res_id = ckan_conf.get('resource_id')
76+
ckan_conf['dag_run_id'] = dag_run_id
7077
bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf)
78+
status_dict = {
79+
'dag_run_id': dag_run_id,
80+
'resource_id': res_id,
81+
'state': 'complete',
82+
'message': 'Data ingestion completed successfully for "{res_id}".'.format(
83+
res_id=res_id),
84+
'clear_logs': True
85+
}
86+
aircan_status_update(ckan_site_url, ckan_api_key, status_dict)
7187

7288
import_resource_to_bq_task = PythonOperator(
73-
task_id='import_resource_to_bq',
89+
task_id='import_resource_to_bq_v2',
7490
provide_context=True,
7591
python_callable=task_import_resource_to_bq,
7692
dag=dag,
77-
)
93+
)

aircan/dependencies/google_cloud/bigquery_handler_v2.py

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,28 +23,67 @@ def bq_import_csv(table_id, gcs_path, table_schema, ckan_conf):
2323
job_config.source_format = bigquery.SourceFormat.CSV
2424
# overwrite a Table
2525
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
26-
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
27-
# job_config.autodetect = True
2826
load_job = client.load_table_from_uri(
2927
gcs_path, table_id, job_config=job_config
3028
)
3129

3230
load_job.result() # Waits for table load to complete.
3331
destination_table = client.get_table(table_id)
3432
except Exception as e:
35-
job_config = bigquery.LoadJobConfig()
36-
37-
job_config.skip_leading_rows = 1
38-
job_config.source_format = bigquery.SourceFormat.CSV
39-
# overwrite a Table
40-
job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
41-
# set 'True' for schema autodetect but turning it off since we define schema in explicitly when publishing data using datapub
42-
# job_config.autodetect = True
43-
load_job = client.load_table_from_uri(
44-
gcs_path, table_id, job_config=job_config
33+
logging.info(e)
34+
# Use a list to build the string components efficiently.
35+
error_lines = []
36+
error_lines.append(
37+
"BigQuery Load Job Failed with a BadRequest."
4538
)
46-
load_job.result() # Waits for table load to complete.
47-
destination_table = client.get_table(table_id)
39+
error_lines.append(f"Original API message: {e}")
40+
41+
# The key part: Iterate through the e.errors list and append to our list.
42+
if load_job.errors:
43+
error_lines.append("\n--- Detailed Error Breakdown ---")
44+
logging.info(load_job.errors)
45+
for i, error in enumerate(load_job.errors):
46+
# Format each error dictionary into a readable line.
47+
line = (
48+
f"Error {i+1}: "
49+
f"Reason: {error.get('reason', 'N/A')}, "
50+
f"Location: {error.get('location', 'N/A')}, "
51+
f"Message: {error.get('message', 'N/A')}"
52+
)
53+
error_lines.append(line)
54+
else:
55+
error_lines.append("No detailed errors were provided in the exception.")
56+
57+
# Join the list of lines into a single string with newlines.
58+
error_report_string = "\n".join(error_lines)
59+
logging.info(error_report_string)
60+
status_dict = {
61+
'res_id': ckan_conf.get('resource_id'),
62+
'state': 'failed',
63+
'message': error_report_string,
64+
'dag_run_id': ckan_conf.get('dag_run_id')
65+
}
66+
aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
67+
raise AirflowCKANException('Data ingestion has failed.', str(e))
68+
#status_dict = {
69+
# 'res_id': ckan_conf.get('resource_id'),
70+
# 'state': 'progress',
71+
# 'message': 'Data ingestion using provided schema failed, trying to autodetect schema.',
72+
# 'dag_run_id': ckan_conf.get('dag_run_id')
73+
#}
74+
#aircan_status_update(ckan_conf.get('site_url'), ckan_conf.get('api_key'), status_dict)
75+
#job_config = bigquery.LoadJobConfig()
76+
#job_config.autodetect = True
77+
78+
#job_config.skip_leading_rows = 1
79+
#job_config.source_format = bigquery.SourceFormat.CSV
80+
## overwrite a Table
81+
#job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
82+
#load_job = client.load_table_from_uri(
83+
# gcs_path, table_id, job_config=job_config
84+
#)
85+
#load_job.result() # Waits for table load to complete.
86+
#destination_table = client.get_table(table_id)
4887
status_dict = {
4988
'res_id': ckan_conf.get('resource_id'),
5089
'state': 'progress',

0 commit comments

Comments
 (0)