|
| 1 | + |
| 2 | +import logging |
| 3 | +import time |
| 4 | +import json |
| 5 | +import ast |
| 6 | +from datetime import date, datetime |
| 7 | + |
| 8 | +# Local imports |
| 9 | +from aircan.dependencies.google_cloud.bigquery_handler import bq_import_csv |
| 10 | + |
| 11 | +# Third-party library imports |
| 12 | +from airflow import DAG |
| 13 | +from airflow.exceptions import AirflowException |
| 14 | + |
| 15 | +from airflow.models import Variable |
| 16 | +from airflow.operators.python_operator import PythonOperator |
| 17 | +from airflow.utils.dates import days_ago |
| 18 | + |
| 19 | + |
| 20 | +args = { |
| 21 | + 'start_date': days_ago(0), |
| 22 | + 'params': { |
| 23 | + "resource": { |
| 24 | + "path": "path/to/my.csv", |
| 25 | + "format": "CSV", |
| 26 | + "ckan_resource_id": "res-id-123", |
| 27 | + "schema": { |
| 28 | + "fields": "['field1', 'field2']" |
| 29 | + } |
| 30 | + }, |
| 31 | + "ckan_config": { |
| 32 | + "api_key": "API_KEY", |
| 33 | + "site_url": "URL", |
| 34 | + }, |
| 35 | + "big_query": { |
| 36 | + "bq_project_id": "bigquery_project_id", |
| 37 | + "bq_dataset_id": "bigquery_dataset_id" |
| 38 | + }, |
| 39 | + "output_bucket": str(date.today()) |
| 40 | + } |
| 41 | +} |
| 42 | + |
| 43 | +dag = DAG( |
| 44 | + dag_id='ckan_api_import_to_bq_v2', |
| 45 | + default_args=args, |
| 46 | + schedule_interval=None |
| 47 | +) |
| 48 | + |
| 49 | +def task_import_resource_to_bq(**context): |
| 50 | + logging.info('Invoking import resource to bigquery') |
| 51 | + logging.info("resource: {}".format(context['params'].get('resource', {}))) |
| 52 | + |
| 53 | + gc_file_url = context['params'].get('big_query', {}).get('gcs_uri') |
| 54 | + bq_project_id = context['params'].get('big_query', {}).get('bq_project_id') |
| 55 | + bq_dataset_id = context['params'].get('big_query', {}).get('bq_dataset_id') |
| 56 | + bq_table_name = context['params'].get('big_query', {}).get('bq_table_name') |
| 57 | + logging.info("bq_table_name: {}".format(bq_table_name)) |
| 58 | + |
| 59 | + raw_schema = context['params'].get('resource', {}).get('schema') |
| 60 | + eval_schema = json.loads(raw_schema) |
| 61 | + schema = eval_schema.get('fields') |
| 62 | + logging.info("SCHEMA: {}".format(schema)) |
| 63 | + |
| 64 | + # sample bq_table_id: "bigquerytest-271707.nhs_test.dag_test" |
| 65 | + bq_table_id = '%s.%s.%s' % (bq_project_id, bq_dataset_id, bq_table_name) |
| 66 | + logging.info('Importing %s to BQ %s' % (gc_file_url, bq_table_id)) |
| 67 | + ckan_conf = context['params'].get('ckan_config', {}) |
| 68 | + ckan_conf['resource_id'] = context['params'].get('resource', {}).get('ckan_resource_id') |
| 69 | + bq_import_csv(bq_table_id, gc_file_url, schema, ckan_conf) |
| 70 | + |
| 71 | +import_resource_to_bq_task = PythonOperator( |
| 72 | + task_id='import_resource_to_bq', |
| 73 | + provide_context=True, |
| 74 | + python_callable=task_import_resource_to_bq, |
| 75 | + dag=dag, |
| 76 | +) |
0 commit comments