Skip to content

Commit

Permalink
more delinting
Browse files Browse the repository at this point in the history
  • Loading branch information
Didion, John (NIH/NHGRI) [F] committed Mar 23, 2017
1 parent dccd8ea commit d545faf
Show file tree
Hide file tree
Showing 4 changed files with 521 additions and 235 deletions.
113 changes: 77 additions & 36 deletions atropos/commands/multicore.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
"""Classes and methods to support parallelization of operations.
"""
from collections import defaultdict
import inspect
import logging
from multiprocessing import Process, Queue, Value, cpu_count
from multiprocessing import Process, Value
import os
from queue import Empty, Full
import time
from atropos import AtroposError
from atropos.util import run_interruptible

RETRY_INTERVAL = 5
"""Max time to wait between retrying operations."""
Expand Down Expand Up @@ -83,19 +81,36 @@ class PendingQueue(object):
Args:
max_size: Maximum queue size; None == infinite.
"""
def _init(self, max_size=None):
def __init__(self, max_size=None):
self.queue = {}
self.max_size = max_size
self.min_priority = None

def push(self, priority, value):
"""Add an item to the queue with priority.
Args:
priority: An integer that determines the placement of `value` in
the queue. Must be unique.
value: The value to queue.
Raises:
Full if queue is full.
"""
if self.full:
raise Full()
if priority in self.queue:
raise ValueError("Duplicate priority value: {}".format(priority))
self.queue[priority] = value
if self.min_priority is None or priority < self.min_priority:
self.min_priority = priority

def pop(self):
"""Remove and return the item in the queue with lowest priority.
Raises:
Empty if the queue is emtpy.
"""
if self.empty:
raise Empty()
value = self.queue.pop(self.min_priority)
Expand All @@ -107,13 +122,20 @@ def pop(self):

@property
def full(self):
"""Whether the queue is full.
"""
return self.max_size and len(self.queue) >= self.max_size

@property
def empty(self):
"""Whether the queue is empty.
"""
return len(self.queue) == 0

class ParallelPipelineMixin(object):
"""Mixin that implements the `start`, `finish`, and `process_batch` methods
of :class:`Pipeline`.
"""
def start(self, **kwargs):
super().start(**kwargs)
self.seen_batches = set()
Expand All @@ -124,8 +146,10 @@ def process_batch(self, batch):

def finish(self, summary, worker=None):
super().finish(summary, worker=worker)
logging.getLogger().debug("{} finished; processed {} batches, {} reads".format(
worker.name, len(self.seen_batches), sum(self.record_counts.values())))
logging.getLogger().debug(
"%s finished; processed %d batches, %d reads",
worker.name, len(self.seen_batches),
sum(self.record_counts.values()))

class WorkerProcess(Process):
"""Parent class for worker processes that execute Pipelines.
Expand All @@ -147,11 +171,13 @@ def __init__(self, index, input_queue, pipeline, summary_queue, timeout):

def run(self):
logging.getLogger().debug(
"{} running under pid {}".format(self.name, os.getpid()))
"%s running under pid %d", self.name, os.getpid())

summary = {}

def iter_batches():
"""Deque and yield batches.
"""
while True:
batch = dequeue(
self.input_queue,
Expand All @@ -160,9 +186,13 @@ def iter_batches():
yield batch

def enqueue_summary():
"""Enqueue a summary dict.
"""
enqueue(
self.summary_queue, (self.index, self.pipeline.seen_batches, summary),
wait_message="{} waiting to queue summary {{}}".format(self.name),
self.summary_queue,
(self.index, self.pipeline.seen_batches, summary),
wait_message="{} waiting to queue summary {{}}".format(
self.name),
timeout=self.timeout
)

Expand All @@ -174,53 +204,56 @@ def enqueue_summary():
if batch is None:
break
logging.getLogger().debug(
"{} processing batch of size {}".format(
self.name, batch[0]['size']))
"%s processing batch of size %d",
self.name, batch[0]['size'])
self.pipeline.process_batch(batch)
finally:
self.pipeline.finish(summary, worker=self)

logging.getLogger().debug("{} finished normally".format(self.name))
except Exception as e:
logging.getLogger().debug("%s finished normally", self.name)
except Exception as err:
logging.getLogger().error(
"Unexpected error in {}".format(self.name), exc_info=True)
summary['error'] = e
"Unexpected error in %s", self.name, exc_info=True)
summary['error'] = err

logging.getLogger().debug("{} sending summary".format(self.name))
logging.getLogger().debug("%s sending summary", self.name)
enqueue_summary()

def launch_workers(n, args=(), offset=0, worker_class=WorkerProcess):
def launch_workers(num_workers, args=(), offset=0, worker_class=WorkerProcess):
"""Launch `n` workers. Each worker is initialized with an incremental
index starting with `offset`, followed by `args`.
"""
logging.getLogger().info("Starting {} worker processes".format(n))
logging.getLogger().info("Starting %d worker processes", num_workers)
# create workers
workers = [worker_class(i+offset, *args) for i in range(n)]
workers = [worker_class(i+offset, *args) for i in range(num_workers)]
# start workers
for worker in workers:
worker.start()
return workers

def ensure_processes(processes, message="One or more process exited: {}", alive=True):
def ensure_processes(
processes, message="One or more process exited: {}", alive=True):
"""Raise an exception if all processes do not have the expected status
(alive or dead).
"""
is_alive = [process.is_alive() for worker in processes]
is_alive = [worker.is_alive() for worker in processes]
if alive != all(is_alive):
raise MulticoreError(message.format(",".join(
str(i) for i in range(len(is_alive)) if not is_alive[i])))

def wait_on(
condition, wait_message="Waiting {}", timeout=None, fail_callback=None,
wait=None, timeout_callback=None):
condition, *args, wait_message="Waiting {}", timeout=None,
fail_callback=None, wait=None, timeout_callback=None):
"""Wait on a condition to be non-False.
Args:
condition: Function that returns either False or a non-False value.
args: Args with which to call `condition`.
wait_message: The message to log while `condition` is False.
timeout: Number of seconds after which the log messages escalate from
DEBUG to ERROR.
fail_callback: Function that is called each time `condition` returns False.
fail_callback: Function that is called each time `condition` returns
False.
wait: Either a boolean or a wait function to execute. If True,
`time.sleep(RETRY_INTERVAL)` is used as the wait function.
timeout_callback: Eather a function that is called each time, or an
Expand All @@ -234,7 +267,7 @@ def wait_on(
wait = lambda: time.sleep(wait_time)
wait_start = None
while True:
result = condition()
result = condition(*args)
if result is not False:
return result
if fail_callback:
Expand All @@ -244,7 +277,8 @@ def wait_on(
wait_start = now
else:
waiting = now - wait_start
msg = wait_message.format("for {} seconds".format(round(waiting, 1)))
msg = wait_message.format(
"for {} seconds".format(round(waiting, 1)))
if timeout is not None and waiting >= timeout:
logging.getLogger().error(msg)
if timeout_callback:
Expand All @@ -263,7 +297,8 @@ def wait_on_process(process, timeout, terminate=False):
Args:
process: The process on which to wait.
timeout: Number of seconds to wait for process to terminate.
terminate: Whether to force the process to terminate after `timeout` seconds.
terminate: Whether to force the process to terminate after `timeout`
seconds.
"""
timeout_callback = lambda: process.terminate() if terminate else None
return wait_on(
Expand All @@ -285,13 +320,15 @@ def enqueue(
block_timeout: Number of seconds to wait after each `queue.put` attempt.
kwargs: Additional arguments to `wait_on`.
"""
def condition():
def condition(item):
"""Returns True if enqueing was successful.
"""
try:
queue.put(item, block=True, timeout=block_timeout)
return True
except Full:
return False
wait_on(condition, wait_message=wait_message, **kwargs)
wait_on(condition, item, wait_message=wait_message, **kwargs)

def enqueue_all(iterable, queue, timeout, fail_callback):
"""Enqueue all items in `iterable`, using `wait_on` to wait while `queue`
Expand All @@ -307,15 +344,17 @@ def enqueue_all(iterable, queue, timeout, fail_callback):
The number of items queued.
"""
num_items = 0
def condition(item):
"""Returns True if enqueing was successful.
"""
try:
queue.put(item, block=True, timeout=RETRY_INTERVAL)
return True
except Full:
return False
for item in iterable:
def condition():
try:
queue.put(item, block=True, timeout=RETRY_INTERVAL)
return True
except Full:
return False
wait_on(
condition,
condition, item,
wait_message="Main process waiting to queue item {}",
timeout=timeout,
fail_callback=fail_callback)
Expand All @@ -328,6 +367,8 @@ def dequeue(
"""Dequeue an item, using `wait_on` to wait while `queue` is empty.
"""
def condition():
"""Returns an item from the queue, or False if the queue is empty.
"""
try:
return queue.get(block=True, timeout=block_timeout)
except Empty:
Expand Down
Loading

0 comments on commit d545faf

Please sign in to comment.