Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove HardLimitingWorker by @SchrosCat2013 fixing #6424 #7194

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion redash/monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ def get_status():


def rq_job_ids():
queues = Queue.all(connection=redis_connection)
queues = Queue.all(connection=rq_redis_connection)

started_jobs = [StartedJobRegistry(queue=q).get_job_ids() for q in queues]
queued_jobs = [q.job_ids for q in queues]
Expand Down
127 changes: 1 addition & 126 deletions redash/tasks/worker.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
import errno
import os
import signal
import sys

from rq import Queue as BaseQueue
from rq.job import Job as BaseJob
from rq.job import JobStatus
from rq.timeouts import HorseMonitorTimeoutException
from rq.utils import utcnow
from rq.worker import (
HerokuWorker, # HerokuWorker implements graceful shutdown on SIGTERM
Worker,
Expand Down Expand Up @@ -71,127 +66,7 @@ def execute_job(self, job, queue):
statsd_client.incr("rq.jobs.failed.{}".format(queue.name))


class HardLimitingWorker(BaseWorker):
"""
RQ's work horses enforce time limits by setting a timed alarm and stopping jobs
when they reach their time limits. However, the work horse may be entirely blocked
and may not respond to the alarm interrupt. Since respecting timeouts is critical
in Redash (if we don't respect them, workers may be infinitely stuck and as a result,
service may be denied for other queries), we enforce two time limits:
1. A soft time limit, enforced by the work horse
2. A hard time limit, enforced by the parent worker

The HardLimitingWorker class changes the default monitoring behavior of the default
RQ Worker by checking if the work horse is still busy with the job, even after
it should have timed out (+ a grace period of 15s). If it does, it kills the work horse.
"""

grace_period = 15
queue_class = RedashQueue
job_class = CancellableJob

def stop_executing_job(self, job):
os.kill(self.horse_pid, signal.SIGINT)
self.log.warning("Job %s has been cancelled.", job.id)

def soft_limit_exceeded(self, job):
job_has_time_limit = job.timeout != -1

if job_has_time_limit:
seconds_under_monitor = (utcnow() - self.monitor_started).seconds
return seconds_under_monitor > job.timeout + self.grace_period
else:
return False

def enforce_hard_limit(self, job):
self.log.warning(
"Job %s exceeded timeout of %ds (+%ds grace period) but work horse did not terminate it. "
"Killing the work horse.",
job.id,
job.timeout,
self.grace_period,
)
self.kill_horse()

def monitor_work_horse(self, job: "Job", queue: "Queue"):
"""The worker will monitor the work horse and make sure that it
either executes successfully or the status of the job is set to
failed

Args:
job (Job): _description_
queue (Queue): _description_
"""
self.monitor_started = utcnow()
retpid = ret_val = rusage = None
job.started_at = utcnow()
while True:
try:
with self.death_penalty_class(self.job_monitoring_interval, HorseMonitorTimeoutException):
retpid, ret_val, rusage = self.wait_for_horse()
break
except HorseMonitorTimeoutException:
# Horse has not exited yet and is still running.
# Send a heartbeat to keep the worker alive.
self.set_current_job_working_time((utcnow() - job.started_at).total_seconds())

job.refresh()
# Kill the job from this side if something is really wrong (interpreter lock/etc).
if job.timeout != -1 and self.current_job_working_time > (job.timeout + 60): # type: ignore
self.heartbeat(self.job_monitoring_interval + 60)
self.kill_horse()
self.wait_for_horse()
break

self.maintain_heartbeats(job)

if job.is_cancelled:
self.stop_executing_job(job)

if self.soft_limit_exceeded(job):
self.enforce_hard_limit(job)

except OSError as e:
# In case we encountered an OSError due to EINTR (which is
# caused by a SIGINT or SIGTERM signal during
# os.waitpid()), we simply ignore it and enter the next
# iteration of the loop, waiting for the child to end. In
# any other case, this is some other unexpected OS error,
# which we don't want to catch, so we re-raise those ones.
if e.errno != errno.EINTR:
raise
# Send a heartbeat to keep the worker alive.
self.heartbeat()

self.set_current_job_working_time(0)
self._horse_pid = 0 # Set horse PID to 0, horse has finished working
if ret_val == os.EX_OK: # The process exited normally.
return

job_status = job.get_status()

if job_status is None: # Job completed and its ttl has expired
return
elif self._stopped_job_id == job.id:
# Work-horse killed deliberately
self.log.warning("Job stopped by user, moving job to FailedJobRegistry")
if job.stopped_callback:
job.execute_stopped_callback(self.death_penalty_class)
self.handle_job_failure(job, queue=queue, exc_string="Job stopped by user, work-horse terminated.")
elif job_status not in [JobStatus.FINISHED, JobStatus.FAILED]:
if not job.ended_at:
job.ended_at = utcnow()

# Unhandled failure: move the job to the failed queue
signal_msg = f" (signal {os.WTERMSIG(ret_val)})" if ret_val and os.WIFSIGNALED(ret_val) else ""
exc_string = f"Work-horse terminated unexpectedly; waitpid returned {ret_val}{signal_msg}; "
self.log.warning("Moving job to FailedJobRegistry (%s)", exc_string)

self.handle_work_horse_killed(job, retpid, ret_val, rusage)
self.handle_job_failure(job, queue=queue, exc_string=exc_string)


class RedashWorker(StatsdRecordingWorker, HardLimitingWorker):
class RedashWorker(StatsdRecordingWorker):
queue_class = RedashQueue


Expand Down
Loading