Skip to content

Commit 77c3857

Browse files
committed
Initial commit
0 parents  commit 77c3857

28 files changed

+1131
-0
lines changed

.gitignore

+135
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
2+
# Created by https://www.gitignore.io/api/linux,python,visualstudiocode
3+
# Edit at https://www.gitignore.io/?templates=linux,python,visualstudiocode
4+
5+
### Linux ###
6+
*~
7+
8+
# temporary files which can be created if a process still has a handle open of a deleted file
9+
.fuse_hidden*
10+
11+
# KDE directory preferences
12+
.directory
13+
14+
# Linux trash folder which might appear on any partition or disk
15+
.Trash-*
16+
17+
# .nfs files are created when an open file is removed but is still being accessed
18+
.nfs*
19+
20+
### Python ###
21+
# Byte-compiled / optimized / DLL files
22+
__pycache__/
23+
*.py[cod]
24+
*$py.class
25+
26+
# C extensions
27+
*.so
28+
29+
# Distribution / packaging
30+
.Python
31+
build/
32+
develop-eggs/
33+
dist/
34+
downloads/
35+
eggs/
36+
.eggs/
37+
lib/
38+
lib64/
39+
parts/
40+
sdist/
41+
var/
42+
wheels/
43+
pip-wheel-metadata/
44+
share/python-wheels/
45+
*.egg-info/
46+
.installed.cfg
47+
*.egg
48+
MANIFEST
49+
50+
# PyInstaller
51+
# Usually these files are written by a python script from a template
52+
# before PyInstaller builds the exe, so as to inject date/other infos into it.
53+
*.manifest
54+
*.spec
55+
56+
# Installer logs
57+
pip-log.txt
58+
pip-delete-this-directory.txt
59+
60+
# Unit test / coverage reports
61+
htmlcov/
62+
.tox/
63+
.nox/
64+
.coverage
65+
.coverage.*
66+
.cache
67+
nosetests.xml
68+
coverage.xml
69+
*.cover
70+
.hypothesis/
71+
.pytest_cache/
72+
73+
# Translations
74+
*.mo
75+
*.pot
76+
77+
# Scrapy stuff:
78+
.scrapy
79+
80+
# Sphinx documentation
81+
docs/_build/
82+
83+
# PyBuilder
84+
target/
85+
86+
# pyenv
87+
.python-version
88+
89+
# pipenv
90+
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
91+
# However, in case of collaboration, if having platform-specific dependencies or dependencies
92+
# having no cross-platform support, pipenv may install dependencies that don't work, or not
93+
# install all needed dependencies.
94+
#Pipfile.lock
95+
96+
# celery beat schedule file
97+
celerybeat-schedule
98+
99+
# SageMath parsed files
100+
*.sage.py
101+
102+
# Spyder project settings
103+
.spyderproject
104+
.spyproject
105+
106+
# Rope project settings
107+
.ropeproject
108+
109+
# Mr Developer
110+
.mr.developer.cfg
111+
.project
112+
.pydevproject
113+
114+
# mkdocs documentation
115+
/site
116+
117+
# mypy
118+
.mypy_cache/
119+
.dmypy.json
120+
dmypy.json
121+
122+
# Pyre type checker
123+
.pyre/
124+
125+
### VisualStudioCode ###
126+
.vscode/*
127+
!.vscode/tasks.json
128+
!.vscode/launch.json
129+
!.vscode/extensions.json
130+
131+
### VisualStudioCode Patch ###
132+
# Ignore all local history of files
133+
.history
134+
135+
# End of https://www.gitignore.io/api/linux,python,visualstudiocode

README.md

+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Ergo - Task Offloader
2+
3+
## Installation
4+
5+
- Clone the repository.
6+
- Link src/ directory as a new plugin in <AIRFLOW_HOME>/plugins.
7+
8+
## Usage
9+
10+
- **IMPORTANT!** Add a dummy DAG in your DAGs folder (<AIRFLOW_HOME>/dags) to load required Ergo DAGs. You can use [this script](sample/dags/dag_ergo.py).
11+
- Enable two DAGS - `ergo_task_queuer` and `ergo_job_collector` in the Airflow UI.
12+
13+
## Configuration
14+
15+
Sample:
16+
17+
```ini
18+
[ergo]
19+
request_queue_url = $REQUEST_SQS_QUEUE
20+
result_queue_url = $RESULT_SQS_QUEUE
21+
```
22+
23+
Explanation:
24+
25+
- `request_queue_url` - SQS queue url used for ergo task requests, i.e. airflow is producer
26+
- `result_queue_url` - SQS queue url used for ergo task results, i.e. airflow is consumer
27+
- `max_task_requests` - Maximum number of ergo requests to batch before sending to SQS (default 10)

sample/dags/dag_ergo.py

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Dummy file to load required dags from ergo
2+
from ergo.dags.dag_task_queuer import dag as sqs_queue_dag
3+
from ergo.dags.dag_job_collector import dag as job_collector_dag

src/__init__.py

+16
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from airflow.utils.state import State
2+
3+
SECTION_NAME = "ergo"
4+
5+
class JobResultStatus(object):
6+
NONE = 0
7+
SUCCESS = 200
8+
9+
@staticmethod
10+
def task_state(code):
11+
if code == JobResultStatus.SUCCESS:
12+
return State.SUCCESS
13+
elif code == JobResultStatus.NONE:
14+
return State.QUEUED
15+
else:
16+
return State.FAILED

src/alembic.ini

+85
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
# A generic, single database configuration.
2+
3+
[alembic]
4+
# path to migration scripts
5+
script_location = migrations
6+
7+
# template used to generate migration files
8+
# file_template = %%(rev)s_%%(slug)s
9+
10+
# timezone to use when rendering the date
11+
# within the migration file as well as the filename.
12+
# string value is passed to dateutil.tz.gettz()
13+
# leave blank for localtime
14+
# timezone =
15+
16+
# max length of characters to apply to the
17+
# "slug" field
18+
# truncate_slug_length = 40
19+
20+
# set to 'true' to run the environment during
21+
# the 'revision' command, regardless of autogenerate
22+
revision_environment = true
23+
24+
# set to 'true' to allow .pyc and .pyo files without
25+
# a source .py file to be detected as revisions in the
26+
# versions/ directory
27+
# sourceless = false
28+
29+
# version location specification; this defaults
30+
# to scripts/versions. When using multiple version
31+
# directories, initial revisions must be specified with --version-path
32+
# version_locations = %(here)s/bar %(here)s/bat scripts/versions
33+
34+
# the output encoding used when revision files
35+
# are written from script.py.mako
36+
# output_encoding = utf-8
37+
38+
sqlalchemy.url = mysql+pymysql://root:password@localhost:3306/chronos
39+
40+
41+
[post_write_hooks]
42+
# post_write_hooks defines scripts or Python functions that are run
43+
# on newly generated revision scripts. See the documentation for further
44+
# detail and examples
45+
46+
# format using "black" - use the console_scripts runner, against the "black" entrypoint
47+
# hooks=black
48+
# black.type=console_scripts
49+
# black.entrypoint=black
50+
# black.options=-l 79
51+
52+
# Logging configuration
53+
[loggers]
54+
keys = root,sqlalchemy,alembic
55+
56+
[handlers]
57+
keys = console
58+
59+
[formatters]
60+
keys = generic
61+
62+
[logger_root]
63+
level = WARN
64+
handlers = console
65+
qualname =
66+
67+
[logger_sqlalchemy]
68+
level = WARN
69+
handlers =
70+
qualname = sqlalchemy.engine
71+
72+
[logger_alembic]
73+
level = INFO
74+
handlers =
75+
qualname = alembic
76+
77+
[handler_console]
78+
class = StreamHandler
79+
args = (sys.stderr,)
80+
level = NOTSET
81+
formatter = generic
82+
83+
[formatter_generic]
84+
format = %(levelname)-5.5s [%(name)s] %(message)s
85+
datefmt = %H:%M:%S

src/config.py

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
from airflow.configuration import conf
2+
3+
from ergo import SECTION_NAME
4+
5+
6+
class Config(object):
7+
max_requests = conf.getint(SECTION_NAME, "max_task_requests", fallback=10)
8+
sqs_request_queue_url = conf.get(SECTION_NAME, "request_queue_url")
9+
sqs_result_queue_url = conf.get(SECTION_NAME, "result_queue_url")

src/dags/__init__.py

Whitespace-only changes.

src/dags/dag_job_collector.py

+44
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
from datetime import timedelta
2+
3+
from airflow import DAG
4+
from airflow.contrib.sensors.aws_sqs_sensor import SQSSensor
5+
from airflow.utils import timezone
6+
from airflow.utils.dates import days_ago
7+
8+
from ergo.config import Config
9+
from ergo.operators.sqs.result_from_messages import \
10+
JobResultFromMessagesOperator
11+
12+
TASK_ID_SQS_COLLECTOR = "collect_sqs_messages"
13+
14+
default_args = {
15+
'owner': 'airflow',
16+
'depends_on_past': False,
17+
'retries': 10,
18+
'retry_delay': timedelta(minutes=2),
19+
'start_date': days_ago(1),
20+
}
21+
22+
sqs_queue_url = Config.sqs_result_queue_url
23+
24+
with DAG(
25+
'ergo_job_collector',
26+
default_args=default_args,
27+
is_paused_upon_creation=False,
28+
schedule_interval=timedelta(seconds=10),
29+
catchup=False,
30+
max_active_runs=1
31+
) as dag:
32+
sqs_collector = SQSSensor(
33+
task_id=TASK_ID_SQS_COLLECTOR,
34+
sqs_queue=sqs_queue_url,
35+
max_messages=10,
36+
wait_time_seconds=10
37+
)
38+
39+
result_transformer = JobResultFromMessagesOperator(
40+
task_id='process_job_result',
41+
sqs_sensor_task_id=TASK_ID_SQS_COLLECTOR
42+
)
43+
44+
sqs_collector >> result_transformer

src/dags/dag_task_queuer.py

+47
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from datetime import timedelta
2+
3+
from airflow import DAG
4+
from airflow.configuration import conf
5+
from airflow.utils import timezone
6+
from airflow.utils.dates import days_ago
7+
8+
from ergo.config import Config
9+
from ergo.operators.sqs.sqs_task_pusher import SqsTaskPusherOperator
10+
from ergo.sensors.task_requests_batcher import TaskRequestBatchSensor
11+
12+
XCOM_REQUEST_TASK_KEY = "request.tasks"
13+
TASK_ID_REQUEST_SENSOR = "collect_requests"
14+
15+
default_args = {
16+
'owner': 'airflow',
17+
'depends_on_past': False,
18+
'retries': 10,
19+
'retry_delay': timedelta(minutes=2),
20+
'start_date': days_ago(1),
21+
}
22+
23+
max_requests = Config.max_requests
24+
sqs_queue_url = Config.sqs_request_queue_url
25+
26+
with DAG(
27+
'ergo_task_queuer',
28+
default_args=default_args,
29+
is_paused_upon_creation=False,
30+
schedule_interval=timedelta(seconds=10),
31+
catchup=False,
32+
max_active_runs=1
33+
) as dag:
34+
collector = TaskRequestBatchSensor(
35+
task_id=TASK_ID_REQUEST_SENSOR,
36+
max_requests=max_requests,
37+
xcom_tasks_key=XCOM_REQUEST_TASK_KEY
38+
)
39+
40+
pusher = SqsTaskPusherOperator(
41+
task_id="push_tasks",
42+
task_id_collector=TASK_ID_REQUEST_SENSOR,
43+
xcom_tasks_key=XCOM_REQUEST_TASK_KEY,
44+
sqs_queue_url=sqs_queue_url
45+
)
46+
47+
collector >> pusher

src/exceptions.py

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from airflow.exceptions import AirflowFailException
2+
3+
4+
class ErgoFailedResultException(AirflowFailException):
5+
def __init__(self, status_code, error_msg):
6+
self.status_code = status_code
7+
self.error_msg = error_msg
8+
9+
def __str__(self):
10+
return f'{self.status_code}: {self.error_msg}'

src/migrations/README

+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Generic single-database configuration.

0 commit comments

Comments
 (0)