77
88# Local imports
99from aircan .dependencies .google_cloud .bigquery_handler_v2 import bq_import_csv
10+ from aircan .dependencies .utils import aircan_status_update
1011
1112# Third-party library imports
1213from airflow import DAG
4748)
4849
4950def task_import_resource_to_bq (** context ):
51+ ckan_api_key = context ['params' ].get ('ckan_config' , {}).get ('api_key' )
52+ ckan_site_url = context ['params' ].get ('ckan_config' , {}).get ('site_url' )
5053 logging .info ('Invoking import resource to bigquery' )
5154 logging .info ("resource: {}" .format (context ['params' ].get ('resource' , {})))
5255
@@ -68,7 +71,29 @@ def task_import_resource_to_bq(**context):
6871 logging .info ('Importing %s to BQ %s' % (gc_file_url , bq_table_id ))
6972 ckan_conf = context ['params' ].get ('ckan_config' , {})
7073 ckan_conf ['resource_id' ] = context ['params' ].get ('resource' , {}).get ('ckan_resource_id' )
71- bq_import_csv (bq_table_id , gc_file_url , schema , ckan_conf )
74+ dag_run_id = context ['dag_run' ].run_id
75+ res_id = ckan_conf .get ('resource_id' )
76+ try :
77+ bq_import_csv (bq_table_id , gc_file_url , schema , ckan_conf )
78+ status_dict = {
79+ 'dag_run_id' : dag_run_id ,
80+ 'resource_id' : res_id ,
81+ 'state' : 'complete' ,
82+ 'message' : 'Data ingestion completed successfully for "{res_id}".' .format (
83+ res_id = res_id ),
84+ 'clear_logs' : True
85+ }
86+ aircan_status_update (ckan_site_url , ckan_api_key , status_dict )
87+ except Exception as e :
88+ status_dict = {
89+ 'dag_run_id' : dag_run_id ,
90+ 'resource_id' : res_id ,
91+ 'state' : 'failed' ,
92+ 'message' : str (e ),
93+ 'clear_logs' : True
94+ }
95+ aircan_status_update (ckan_site_url , ckan_api_key , status_dict )
96+ raise Exception (str (e ))
7297
7398import_resource_to_bq_task = PythonOperator (
7499 task_id = 'import_resource_to_bq_v2' ,
0 commit comments