Skip to content

Commit

Permalink
Make Celery worker the root process (#1560)
Browse files Browse the repository at this point in the history
* make celery worker main process in container
* enable hot reloading option for debugging
* restore all Turbinia logging
  • Loading branch information
hacktobeer authored Oct 4, 2024
1 parent 252410c commit 77b4068
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 83 deletions.
8 changes: 4 additions & 4 deletions docker/worker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,6 @@ ENV PATH="/home/turbinia/.venv/bin:$PATH" \

# Copy the source directory to the container
COPY --chown=turbinia:turbinia . /home/turbinia/
COPY --chown=turbinia:turbinia docker/worker/start.sh /home/turbinia/start.sh
RUN chmod +rwx /home/turbinia/start.sh

# Install Turbinia package -- will skip dependencies if installed
RUN poetry install --no-interaction --no-ansi -E worker
Expand All @@ -124,9 +122,11 @@ ARG TURBINIA_DEBUG
ARG TURBINIA_HOTRELOAD
ARG TURBINIA_DEBUG_PORT
ENV TURBINIA_DEBUG ${TURBINIA_DEBUG:-0}
ENV TURBINIA_HOTRELOAD ${TURBINIA_HOTRELOAD:-0}
ENV TURBINIA_DEBUG_PORT ${TURBINIA_DEBUG_PORT:-10000}

CMD ["/home/turbinia/start.sh"]
CMD ["celery","--app","turbinia.app","worker","-l","INFO","--without-gossip","-E"]
# Use below CMD instead to enable hot reloading of celery when code changes.
# CMD ["watchmedo","auto-restart","--directory=./","--pattern=*.py","--recursive","--","celery","--app","turbinia.app","worker","-l","INFO","--without-gossip","-E"]

# Expose Prometheus endpoint.
EXPOSE 9200/tcp
44 changes: 0 additions & 44 deletions docker/worker/start.sh

This file was deleted.

52 changes: 52 additions & 0 deletions turbinia/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Celery worker app.
Configures and Starts the Turbinia Celery worker.
"""
import celery
import logging
import os

from turbinia import debug
from turbinia import celeryconfig
from turbinia import config
from turbinia import task_utils
from turbinia.config import logger

config.LoadConfig()

config.TURBINIA_COMMAND = 'celeryworker'


@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
"""Setup Turbinia logging as Celery overrides all handlers."""
logger.setup()
turbinia_log = logging.getLogger('turbinia')
turbinia_log.setLevel('INFO')
if os.getenv('TURBINIA_DEBUG', '') == '1':
turbinia_log.setLevel('DEBUG')


app = celery.Celery(
'turbinia', broker=config.CELERY_BROKER, backend=config.CELERY_BACKEND)
app.config_from_object(celeryconfig)
app.task(task_utils.task_runner, name='task_runner')

debug.initialize_debugmode_if_requested()

if __name__ == '__main__':
app.start()
35 changes: 35 additions & 0 deletions turbinia/celeryconfig.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# -*- coding: utf-8 -*-
# Copyright 2017 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Celery confguration.
Sets configuration for the Turbinia Celery components.
"""
from turbinia import config

config.LoadConfig()

accept_content = ['json']
broker_connection_retry_on_startup = True
# Store Celery task results metadata
result_backend = config.CELERY_BACKEND
task_default_queue = config.INSTANCE_ID
# Re-queue task if Celery worker abruptly exists
task_reject_on_worker_lost = True
task_track_started = True
worker_cancel_long_running_tasks_on_connection_loss = True
worker_concurrency = 1
worker_prefetch_multiplier = 1
# Avoid task duplication
worker_deduplicate_successful_tasks = True
16 changes: 2 additions & 14 deletions turbinia/tcelery.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from amqp.exceptions import ChannelError

from turbinia import config
from turbinia import celeryconfig
from turbinia.message import TurbiniaMessageBase

log = logging.getLogger(__name__)
Expand All @@ -49,20 +50,7 @@ def setup(self):
config.LoadConfig()
self.app = celery.Celery(
'turbinia', broker=config.CELERY_BROKER, backend=config.CELERY_BACKEND)
self.app.conf.update(
accept_content=['json'],
broker_connection_retry_on_startup=True,
# Store Celery task results metadata
result_backend=config.CELERY_BACKEND,
task_default_queue=config.INSTANCE_ID,
# Re-queue task if Celery worker abruptly exists
task_reject_on_worker_lost=True,
task_track_started=True,
worker_cancel_long_running_tasks_on_connection_loss=True,
worker_concurrency=1,
worker_prefetch_multiplier=1,
# Avoid task duplication
worker_deduplicate_successful_tasks=True)
self.app.config_from_object(celeryconfig)


class TurbiniaKombu(TurbiniaMessageBase):
Expand Down
9 changes: 3 additions & 6 deletions turbinia/turbiniactl.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,9 @@ def process_args(args):
# the config until after we parse the args so that we can use those arguments
# to point to config paths.
elif args.command == 'celeryworker':
initialize_debugmode_if_requested()
# pylint: disable=import-outside-toplevel
from turbinia.worker import TurbiniaCeleryWorker
worker = TurbiniaCeleryWorker(
jobs_denylist=args.jobs_denylist, jobs_allowlist=args.jobs_allowlist)
worker.start()
log.error(
'''Deprecated: Please start Turbinia celery worker
directly with `celery -A turbinia.app worker -l DEBUG -E`''')
elif args.command == 'server':
initialize_debugmode_if_requested()
# pylint: disable=import-outside-toplevel
Expand Down
15 changes: 0 additions & 15 deletions turbinia/turbiniactl_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,21 +39,6 @@ def testInvalidCommand(self):
self.assertRaises((argparse.ArgumentError, SystemExit),
turbiniactl.process_args, [args.command])

@mock.patch('turbinia.worker.TurbiniaCeleryWorker')
def testCeleryWorkerCommand(self, mock_worker):
"""Test CeleryWorker command."""
args = argparse.Namespace(command='celeryworker')
turbiniactl.process_args([args.command])
mock_worker.assert_called_once_with(jobs_denylist=[], jobs_allowlist=[])

@mock.patch('turbinia.config.ParseDependencies')
@mock.patch('turbinia.worker.TurbiniaCeleryWorker.start')
def testCeleryWorkerCommandStart(self, mock_worker, _):
"""Test CeleryWorker start."""
args = argparse.Namespace(command='celeryworker')
turbiniactl.process_args([args.command])
mock_worker.assert_called_once_with()

@mock.patch('turbinia.server.TurbiniaServer')
def testServerCommand(self, mock_server):
"""Test Server command."""
Expand Down

0 comments on commit 77b4068

Please sign in to comment.