Skip to content

Commit

Permalink
Cleaner shutdown (#41)
Browse files Browse the repository at this point in the history
This makes some attempts to clean up during shutdown.

The main thing is the atexit function in the run_pipeline function which simply closes a run/node_run when shutting down.
The atexit in run() will shutdown any still running processes.
It also attempts to kill the run_process when the run_pipeline process is shut down via strg+c. This should trigger the atexit function in run() so at least the children are killed. This does not do any attempt to do the same for any other signal yet
Also included is a fix to actually check all ancestors of a task when checking if any of them is already failed -> the effect is that we do not schedule any tasks from an already queued sub pipelines (like a parallel task) in case the parent of that subpipeline is failed.

With this in place I could successfully add some signal handler (not included yet, needs some more testing) which kills the running processes and closes the runs. It also handles strg+c in flask better, at least I didn't see leftover processes anymore.

partly covers: #40
  • Loading branch information
jankatins authored Jun 9, 2020
1 parent 4140456 commit b72e6cb
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 15 deletions.
70 changes: 58 additions & 12 deletions data_integration/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import os
import sys
import signal
import atexit
import time
import traceback
from multiprocessing import queues
Expand Down Expand Up @@ -109,6 +110,24 @@ def with_all_upstreams(nodes: {pipelines.Node}):
failed_pipelines: {pipelines.Pipeline} = set() # pipelines with failed tasks
running_task_processes: {pipelines.Task: TaskProcess} = {}

# make sure any running tasks are killed when this executor process is shutdown
executor_pid = os.getpid()

def ensure_task_processes_killed():
# as we fork, the TaskProcess also runs this function -> ignore it there
if os.getpid() != executor_pid: return
try:
for tp in list(running_task_processes.values()): # type: TaskProcess
if tp.is_alive():
# give it a chance to gracefully shutdown
tp.terminate()
statistics_process.kill()
except BaseException as e:
print(f"Exception during TaskProcess cleanup: {repr(e)}", file=sys.stderr, flush=True)
return

atexit.register(ensure_task_processes_killed)

def dequeue() -> pipelines.Node:
"""
Finds the next task in the queue
Expand All @@ -122,10 +141,22 @@ def dequeue() -> pipelines.Node:
or (not node.parent in running_pipelines)
or (running_pipelines[node.parent][1] < node.parent.max_number_of_parallel_tasks))):
node_queue.remove(node)
if node.parent in failed_pipelines and not node.parent.force_run_all_children:
processed_as_parent_failed = False
parent = node.parent
while parent:
# if the parent pipeline failed (and no overwrite), don't launch new nodes
processed_nodes.add(node)
else:
# this needs to go down to the ultimate parent as we can have cases where we already
# queued a subpipeline and now the parent pipeline failed but the tasks parent pipeline
# (the sub pipeline) is not failed.
# If a task from a parent pipeline fails, even with force_run_all_children on the
# sub pipeline, the sub pipeline would stop. Only if the failed parent pipeline also has
# force_run_all_children, the task would get scheduled
if parent in failed_pipelines and not parent.force_run_all_children:
processed_nodes.add(node)
processed_as_parent_failed = True
break
else: parent = parent.parent
if not processed_as_parent_failed:
return node

def track_finished_pipelines():
Expand Down Expand Up @@ -285,6 +316,18 @@ def track_finished_pipelines():

runlogger = run_log.RunLogger()

# make sure that we close this run (if still open) as failed when we close this python process
# On SIGKILL we will still leave behind open runs...
# this needs to run after we forked off the run_process as that one should not inherit the atexit function
def ensure_closed_run_on_abort():
try:
run_log.close_open_run_after_error(runlogger.run_id)
except BaseException as e:
print(f"Exception during 'close_open_run_after_error()': {repr(e)}", file=sys.stderr, flush=True)
return

atexit.register(ensure_closed_run_on_abort)

def _notify_all(event):
try:
runlogger.handle_event(event)
Expand Down Expand Up @@ -313,10 +356,9 @@ def _notify_all(event):
# Catching GeneratorExit needs to end in a return!
return
except:
def _create_exception_output_event(msg: str = ''):
if msg:
msg = msg + '\n'
return pipeline_events.Output(node_path=pipeline.path(), message=msg + traceback.format_exc(),
def _create_exception_output_event(msg: str = None):
return pipeline_events.Output(node_path=pipeline.path(),
message=(msg + '\n' if msg else '') + traceback.format_exc(),
format=logger.Format.ITALICS, is_error=True)

output_event = _create_exception_output_event()
Expand All @@ -327,23 +369,27 @@ def _create_exception_output_event(msg: str = ''):
# we are already in the generic exception handler, so we cannot do anything
# if we still fail, as we have to get to the final close_open_run_after_error()
# and 'return'...
msg = "Could not notify about final output event"
exception_events.append(_create_exception_output_event(msg))
exception_events.append(_create_exception_output_event("Could not notify about final output event"))
yield output_event
try:
run_log.close_open_run_after_error(runlogger.run_id)
except BaseException as e:
msg = "Exception during 'close_open_run_after_error()'"
exception_events.append(_create_exception_output_event(msg))
exception_events.append(_create_exception_output_event("Exception during 'close_open_run_after_error()'"))

# At least try to notify the UI
for e in exception_events:
print(f"{repr(e)}", file=sys.stderr)
yield e
events.notify_configured_event_handlers(e)

# try to terminate the run_process which itself will also cleanup in an atexit handler
try:
run_process.terminate()
except:
pass
return
if not run_process.is_alive():
# If we are here it might be that the executor dies without sending the necessary run finished events
ensure_closed_run_on_abort()
break
time.sleep(0.001)

Expand Down
15 changes: 12 additions & 3 deletions data_integration/logging/run_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,20 +70,29 @@ def close_open_run_after_error(run_id: int):
"""Closes all open run and node_run for this run_id as failed"""
if run_id is None:
return
print(f'Run aborted, cleaning up (run_id = {run_id})')
_close_run = f'''
UPDATE data_integration_run
SET end_time = now(), succeeded = FALSE
WHERE run_id = {"%s"} and end_time IS NULL
RETURNING run_id
'''
_close_node_run = f'''
UPDATE data_integration_node_run
SET end_time = now(), succeeded = FALSE
WHERE run_id = {"%s"} and end_time IS NULL
RETURNING run_id
'''
with mara_db.postgresql.postgres_cursor_context('mara') as cursor: # type: psycopg2.extensions.cursor
cursor.execute(_close_node_run, (run_id,))
cursor.execute(_close_run, (run_id,))
_closed_any=False
for code in [_close_node_run, _close_run]:
try:
cursor.execute(code, (run_id,))
if cursor.fetchall():
_closed_any = True
except:
pass
if _closed_any:
print(f'Cleaned up open runs/node_runs (run_id = {run_id})')


class RunLogger(events.EventHandler):
Expand Down

0 comments on commit b72e6cb

Please sign in to comment.