Skip to content

Commit e1fcdd2

Browse files
committed
doc: added example ergo task
1 parent 46a82bf commit e1fcdd2

File tree

2 files changed

+48
-0
lines changed

2 files changed

+48
-0
lines changed

README.md

+1
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
- **IMPORTANT!** Add a dummy DAG in your DAGs folder (<AIRFLOW_HOME>/dags) to load required Ergo DAGs. You can use [this script](sample/dags/dag_ergo.py).
1111
- Enable two DAGS - `ergo_task_queuer` and `ergo_job_collector` in the Airflow UI.
12+
- An example DAG using Ergo to offload tasks can be [found here](sample/dags/example.py).
1213

1314
## Configuration
1415

sample/dags/example.py

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from datetime import datetime, timedelta
2+
from random import choice
3+
4+
from airflow import DAG
5+
from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor
6+
from airflow.operators.bash_operator import BashOperator
7+
from airflow.operators.dummy_operator import DummyOperator
8+
from airflow.operators.ergo import ErgoTaskProducerOperator
9+
from airflow.sensors.ergo import ErgoJobResultSensor
10+
from airflow.utils.dates import days_ago
11+
12+
default_args = {
13+
'owner': 'airflow',
14+
'depends_on_past': False,
15+
'retries': 3,
16+
'retry_delay': timedelta(seconds=30),
17+
'start_date': days_ago(1),
18+
}
19+
20+
SAMPLE_TASK_IDS = ['noArg', 'oneArg', 'instance_noArg', 'spring_noArg']
21+
SAMPLE_TASK_DATA = [{}, {'value': 2}, {'val': 'sa'}]
22+
23+
24+
def random_task_decider():
25+
return choice(SAMPLE_TASK_IDS), choice(SAMPLE_TASK_DATA)
26+
27+
28+
with DAG(
29+
'example_sqs',
30+
default_args=default_args,
31+
schedule_interval=timedelta(minutes=1)
32+
) as dag:
33+
start_task = DummyOperator(task_id="start")
34+
stop_task = DummyOperator(task_id="stop")
35+
36+
push_task_to_sqs = ErgoTaskProducerOperator(
37+
task_id='example_task_pusher',
38+
ergo_task_callable=random_task_decider
39+
)
40+
41+
wait_job_result = ErgoJobResultSensor(
42+
task_id='example_job_sensor',
43+
pusher_task_id='example_task_pusher'
44+
)
45+
46+
47+
start_task >> push_task_to_sqs >> wait_job_result >> stop_task

0 commit comments

Comments
 (0)