Skip to content

Commit 160f014

Browse files
authored
Create MultiTriggerDag Operator (#6)
* Added patch module helper to allow patching of loaded modules (i.e. via import.load_source) * Adding multi trigger dag example. * Reorganized example dags. * Use pylama for testing.
1 parent c7e9af6 commit 160f014

File tree

18 files changed

+398
-62
lines changed

18 files changed

+398
-62
lines changed

.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
IMAGE_NAME=wongwill86/air-tasks:latest

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,11 @@ DooD support and AWS ECR Credential Helper
55

66
NOTES:
77
Chunkflow: make sure AWS_ACCESS_KEY_ID, etc... are set in environment variables!
8+
export PYTHONDONTWRITEBYTECODE=1
89
docker-compose -f docker/docker-compose.test.yml -p ci build
9-
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw
10+
docker-compose -f docker/docker-compose.test.yml -p ci run --rm sut ptw -- --pylama
1011

12+
export
1113

1214
When deploying docker/docker-compose-CeleryExecutor.yml remember to deploy secrets!
1315
( or put in blank for no web auth )
File renamed without changes.

dags/many.py renamed to dags/examples/interleaved.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
from datetime import datetime, timedelta
22
from airflow import DAG
33
from airflow.operators.bash_operator import BashOperator
4-
from airflow.operators.docker_operator import DockerOperator
54

65

76
default_args = {
@@ -13,7 +12,8 @@
1312
'retry_delay': timedelta(seconds=2),
1413
'retry_exponential_backoff': True,
1514
}
16-
dag = DAG("many_ws", default_args=default_args, schedule_interval=None)
15+
dag = DAG(
16+
"example_interleaved", default_args=default_args, schedule_interval=None)
1717

1818

1919
def create_print_date(dag, count_print_date):
@@ -31,11 +31,9 @@ def create_print_hello(dag, count_print_hello):
3131

3232

3333
def create_docker_print(dag, count_docker_print):
34-
return DockerOperator(
35-
task_id='watershed_print_' + str(count_docker_print),
36-
image='watershed',
37-
command='echo "watershed printing!"',
38-
network_mode='bridge',
34+
return BashOperator(
35+
task_id='bash_print_' + str(count_docker_print),
36+
bash_command='echo "watershed printing!"',
3937
dag=dag)
4038

4139

dags/examples/multi_trigger.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
from airflow import DAG
2+
from datetime import datetime, timedelta
3+
from airflow.operators.custom_plugin import MultiTriggerDagRunOperator
4+
from airflow.operators.bash_operator import BashOperator
5+
6+
SCHEDULE_DAG_ID = 'example_multi_trigger_scheduler'
7+
TARGET_DAG_ID = 'example_multi_trigger_target'
8+
9+
default_args = {
10+
'owner': 'airflow',
11+
'depends_on_past': False,
12+
'start_date': datetime(2017, 5, 1),
13+
'cactchup_by_default': False,
14+
'retries': 1,
15+
'retry_delay': timedelta(seconds=2),
16+
'retry_exponential_backoff': True,
17+
}
18+
19+
# ####################### SCHEDULER #################################
20+
scheduler_dag = DAG(
21+
dag_id=SCHEDULE_DAG_ID,
22+
default_args=default_args,
23+
schedule_interval=None
24+
)
25+
26+
27+
def param_generator():
28+
iterable = xrange(0, 100)
29+
for i in iterable:
30+
yield i
31+
32+
33+
operator = MultiTriggerDagRunOperator(
34+
task_id='trigger_%s' % TARGET_DAG_ID,
35+
trigger_dag_id=TARGET_DAG_ID,
36+
params_list=param_generator(),
37+
default_args=default_args,
38+
dag=scheduler_dag)
39+
40+
# ####################### TARGET DAG #################################
41+
42+
target_dag = DAG(
43+
dag_id=TARGET_DAG_ID,
44+
default_args=default_args,
45+
schedule_interval=None
46+
)
47+
48+
start = BashOperator(
49+
task_id='bash_task',
50+
bash_command='sleep 1; echo "Hello from message #' +
51+
'{{ dag_run.conf if dag_run else "NO MESSAGE" }}"',
52+
default_args=default_args,
53+
dag=target_dag
54+
)

dags/simple.py

Lines changed: 0 additions & 49 deletions
This file was deleted.

docker/Dockerfile.test

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,6 @@ FROM $image_name
33
ARG IMAGE_NAME
44
USER root
55
COPY docker/scripts/entrypoint-test.sh /entrypoint-test.sh
6-
RUN pip install pytest pytest-watch pytest-env flake8
6+
RUN pip install pytest pytest-watch pytest-env pylama mock
77
USER airflow
88
ENTRYPOINT ["/entrypoint-test.sh"]

docker/config/airflow.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ max_active_runs_per_dag = 16
6161
# Whether to load the examples that ship with Airflow. It's good to
6262
# get started, but you probably want to set this to False in a production
6363
# environment
64-
load_examples = True
64+
load_examples = False
6565

6666
# Where your Airflow plugins are stored
6767
plugins_folder = /usr/local/airflow/plugins

docker/docker-compose.test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@ services:
1818
- AWS_SECRET_ACCESS_KEY
1919
- AWS_DEFAULT_REGION
2020
command:
21-
- pytest && flake8 .
21+
- pytest --pylama

plugins/custom/multi_trigger_dag.py

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
from airflow.plugins_manager import AirflowPlugin
2+
from datetime import datetime
3+
import logging
4+
import types
5+
import collections
6+
7+
from airflow.models import BaseOperator
8+
from airflow.models import DagBag
9+
from airflow.utils.decorators import apply_defaults
10+
from airflow.utils.state import State
11+
from airflow import settings
12+
13+
14+
class MultiTriggerDagRunOperator(BaseOperator):
15+
"""
16+
Triggers multiple DAG runs for a specified ``dag_id``.
17+
18+
Draws inspiration from:
19+
airflow.operators.dagrun_operator.TriggerDagRunOperator
20+
21+
:param trigger_dag_id: the dag_id to trigger
22+
:type trigger_dag_id: str
23+
:param params_list: list of dicts for DAG level parameters that are made
24+
acesssible in templates
25+
namespaced under params for each dag run.
26+
:type params: Iterable<dict> or types.GeneratorType
27+
"""
28+
29+
@apply_defaults
30+
def __init__(
31+
self,
32+
trigger_dag_id,
33+
params_list,
34+
*args, **kwargs):
35+
super(MultiTriggerDagRunOperator, self).__init__(*args, **kwargs)
36+
self.trigger_dag_id = trigger_dag_id
37+
self.params_list = params_list
38+
if hasattr(self.params_list, '__len__'):
39+
assert len(self.params_list) > 0
40+
else:
41+
assert (isinstance(params_list, collections.Iterable) or
42+
isinstance(params_list, types.GeneratorType))
43+
44+
def execute(self, context):
45+
session = settings.Session()
46+
dbag = DagBag(settings.DAGS_FOLDER)
47+
trigger_dag = dbag.get_dag(self.trigger_dag_id)
48+
49+
assert trigger_dag is not None
50+
51+
trigger_id = 0
52+
# for trigger_id in range(0, len(self.params_list)):
53+
for params in self.params_list:
54+
dr = trigger_dag.create_dagrun(run_id='trig_%s_%d_%s' %
55+
(self.trigger_dag_id, trigger_id,
56+
datetime.now().isoformat()),
57+
state=State.RUNNING,
58+
conf=params,
59+
external_trigger=True)
60+
logging.info("Creating DagRun {}".format(dr))
61+
session.add(dr)
62+
trigger_id = trigger_id + 1
63+
if trigger_id % 10:
64+
session.commit()
65+
session.commit()
66+
session.close()
67+
68+
69+
class CustomPlugin(AirflowPlugin):
70+
name = "custom_plugin"
71+
operators = [MultiTriggerDagRunOperator]
72+
hooks = []
73+
executors = []
74+
macros = []
75+
admin_views = []
76+
flask_blueprints = []
77+
menu_links = []

0 commit comments

Comments
 (0)