Skip to content

Commit

Permalink
Merge pull request #42 from Esri/refactor-parallelizer
Browse files Browse the repository at this point in the history
Use a shared function to launch and manage parallel processes
mmorang authored Aug 30, 2023
2 parents 7ee9414 + 507e397 commit f6f016f
Showing 5 changed files with 138 additions and 208 deletions.
83 changes: 83 additions & 0 deletions helpers.py
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import traceback
import logging
import subprocess
from concurrent import futures
import arcpy

arc_license = arcpy.ProductInfo()
@@ -661,6 +662,88 @@ def run_gp_tool(log_to_use, tool, tool_args=None, tool_kwargs=None):
return result


def run_parallel_processes(
logger, function_to_call, static_args, chunks, total_jobs, max_processes,
msg_intro_verb, msg_process_str
):
"""Launch and manage parallel processes and return a list of process results.
Args:
logger (logging.logger): Logger to use for the parallelized process
function_to_call (function): Function called in each parallelized job
static_args (list): List of values used as static arguments to the function_to_call
chunks (iterator): Iterator with values that will be passed one at a time to the function_to_call, with each
value being one parallelized chunk.
total_jobs (int): Total number of jobs that will be run. Used in messaging.
max_processes (int): Maximum number of parallel processes allowed.
msg_intro_verb (str): Text to include in the intro message f"{msg_intro_verb} in parallel..."
msg_process_str (_type_): Text to include in messages representing whatever is being parallelized.
Returns:
list: List of returned values from the parallel processes.
"""
logger.info(f"{msg_intro_verb} in parallel ({total_jobs} chunks)...")
completed_jobs = 0 # Track the number of jobs completed so far to use in logging
job_results = []
# Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that call the function
with futures.ProcessPoolExecutor(max_workers=max_processes) as executor:
# Each parallel process calls the designated function with the designated static inputs and a unique chunk
jobs = {executor.submit(
function_to_call, chunk, *static_args): chunk for chunk in chunks}
# As each job is completed, add some logging information and store the results to post-process later
for future in futures.as_completed(jobs):
try:
# Retrieve the results returned by the process
result = future.result()
except Exception: # pylint: disable=broad-except
# If we couldn't retrieve the result, some terrible error happened and the job errored.
# Note: For processes that do network analysis workflows, this does not mean solve failed.
# It means some unexpected error was thrown. The most likely
# causes are:
# a) If you're calling a service, the service was temporarily down.
# b) You had a temporary file read/write or resource issue on your machine.
# c) If you're actively updating the code, you introduced an error.
# To make the tool more robust against temporary glitches, retry submitting the job up to the number
# of times designated in MAX_RETRIES. If the job is still erroring after that many
# retries, fail the entire tool run.
errs = traceback.format_exc().splitlines()
failed_range = jobs[future]
logger.debug((
f"Failed to get results for {msg_process_str} chunk {failed_range} from the parallel process. "
f"Will retry up to {MAX_RETRIES} times. Errors: {errs}"
))
job_failed = True
num_retries = 0
while job_failed and num_retries < MAX_RETRIES:
num_retries += 1
try:
future = executor.submit(function_to_call, failed_range, *static_args)
result = future.result()
job_failed = False
logger.debug(f"{msg_process_str} chunk {failed_range} succeeded after {num_retries} retries.")
except Exception: # pylint: disable=broad-except
# Update exception info to the latest error
errs = traceback.format_exc().splitlines()
if job_failed:
# The job errored and did not succeed after retries. Fail the tool run because something
# terrible is happening.
logger.debug(
f"{msg_process_str} chunk {failed_range} continued to error after {num_retries} retries.")
logger.error(f"Failed to get {msg_process_str} result from parallel processing.")
errs = traceback.format_exc().splitlines()
for err in errs:
logger.error(err)
raise

# If we got this far, the job completed successfully and we retrieved results.
completed_jobs += 1
logger.info(
f"Finished {msg_process_str} {completed_jobs} of {total_jobs}.")
job_results.append(result)

return job_results


class PrecalculateLocationsMixin: # pylint:disable = too-few-public-methods
"""Used to precalculate network locations either directly or calling the parallelized version."""

79 changes: 12 additions & 67 deletions parallel_calculate_locations.py
Original file line number Diff line number Diff line change
@@ -31,7 +31,6 @@
limitations under the License.
"""
# pylint: disable=logging-fstring-interpolation
from concurrent import futures
import os
import uuid
import logging
@@ -142,12 +141,12 @@ def calculate_locations(self, oid_range):
self.job_result["oidRange"] = tuple(oid_range)


def calculate_locations_for_chunk(calc_locs_settings, chunk):
def calculate_locations_for_chunk(chunk, calc_locs_settings):
"""Calculate locations for a range of OIDs in the input dataset.
Args:
calc_locs_settings (dict): Dictionary of kwargs for the LocationCalculator class.
chunk (list): OID range to calculate locations for. Specified as a list of [start range, end range], inclusive.
calc_locs_settings (dict): Dictionary of kwargs for the LocationCalculator class.
Returns:
dict: Dictionary of job results for the chunk
@@ -217,70 +216,16 @@ def __init__( # pylint: disable=too-many-locals, too-many-arguments

def calc_locs_in_parallel(self):
"""Calculate locations in parallel."""
total_jobs = len(self.ranges)
LOGGER.info(f"Beginning parallel Calculate Locations ({total_jobs} chunks)")
completed_jobs = 0 # Track the number of jobs completed so far to use in logging
# Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that calculate chunks of
# locations
with futures.ProcessPoolExecutor(max_workers=self.max_processes) as executor:
# Each parallel process calls the calculate_locations_for_chunk() function for the given range of OIDs
# in the input dataset
jobs = {executor.submit(
calculate_locations_for_chunk, self.calc_locs_inputs, chunk
): chunk for chunk in self.ranges}
# As each job is completed, add some logging information and store the results to post-process later
for future in futures.as_completed(jobs):
try:
# The job returns a results dictionary. Retrieve it.
result = future.result()
except Exception: # pylint: disable=broad-except
# If we couldn't retrieve the result, some terrible error happened and the job errored.
# Note: This does not mean solve failed. It means some unexpected error was thrown. The most likely
# causes are:
# a) If you're calling a service, the service was temporarily down.
# b) You had a temporary file read/write or resource issue on your machine.
# c) If you're actively updating the code, you introduced an error.
# To make the tool more robust against temporary glitches, retry submitting the job up to the number
# of times designated in helpers.MAX_RETRIES. If the job is still erroring after that many retries,
# fail the entire tool run.
errs = traceback.format_exc().splitlines()
failed_range = jobs[future]
LOGGER.debug((
f"Failed to get results for Calculate Locations chunk {failed_range} from the parallel process."
f" Will retry up to {helpers.MAX_RETRIES} times. Errors: {errs}"
))
job_failed = True
num_retries = 0
while job_failed and num_retries < helpers.MAX_RETRIES:
num_retries += 1
try:
future = executor.submit(calculate_locations_for_chunk, self.calc_locs_inputs, failed_range)
result = future.result()
job_failed = False
LOGGER.debug(
f"Calculate Locations chunk {failed_range} succeeded after {num_retries} retries.")
except Exception: # pylint: disable=broad-except
# Update exception info to the latest error
errs = traceback.format_exc().splitlines()
if job_failed:
# The job errored and did not succeed after retries. Fail the tool run because something
# terrible is happening.
LOGGER.debug(
f"Calculate Locations chunk {failed_range} continued to error after {num_retries} retries.")
LOGGER.error("Failed to get Calculate Locations result from parallel processing.")
errs = traceback.format_exc().splitlines()
for err in errs:
LOGGER.error(err)
raise

# If we got this far, the job completed successfully and we retrieved results.
completed_jobs += 1
LOGGER.info(
f"Finished Calculate Locations chunk {completed_jobs} of {total_jobs}.")

# Parse the results dictionary and store components for post-processing.
# Store the ranges as dictionary keys to facilitate sorting.
self.temp_out_fcs[result["oidRange"]] = result["outputFC"]
# Calculate locations in parallel
job_results = helpers.run_parallel_processes(
LOGGER, calculate_locations_for_chunk, [self.calc_locs_inputs], self.ranges,
len(self.ranges), self.max_processes,
"Calculating locations", "Calculate Locations"
)
for result in job_results:
# Parse the results dictionary and store components for post-processing.
# Store the ranges as dictionary keys to facilitate sorting.
self.temp_out_fcs[result["oidRange"]] = result["outputFC"]

# Rejoin the chunked feature classes into one.
LOGGER.info("Rejoining chunked data...")
89 changes: 20 additions & 69 deletions parallel_odcm.py
Original file line number Diff line number Diff line change
@@ -24,7 +24,6 @@
limitations under the License.
"""
# pylint: disable=logging-fstring-interpolation
from concurrent import futures
import os
import uuid
import logging
@@ -623,14 +622,14 @@ def _determine_if_travel_mode_time_based(self):
self.optimized_field_name = "Total_Distance"


def solve_od_cost_matrix(inputs, chunk):
def solve_od_cost_matrix(chunk, inputs):
"""Solve an OD Cost Matrix analysis for the given inputs for the given chunk of ObjectIDs.
Args:
inputs (dict): Dictionary of keyword inputs suitable for initializing the ODCostMatrix class
chunk (list): Represents the ObjectID ranges to select from the origins and destinations when solving the OD
Cost Matrix. For example, [[1, 1000], [4001, 5000]] means use origin OIDs 1-1000 and destination OIDs
4001-5000.
inputs (dict): Dictionary of keyword inputs suitable for initializing the ODCostMatrix class
Returns:
dict: Dictionary of results from the ODCostMatrix class
@@ -786,72 +785,24 @@ def solve_od_in_parallel(self):
self.optimized_cost_field = self._validate_od_settings()

# Compute OD cost matrix in parallel
LOGGER.info(f"Beginning parallelized OD Cost Matrix solves ({self.total_jobs} chunks)")
completed_jobs = 0 # Track the number of jobs completed so far to use in logging
# Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that solve the OD cost matrices
with futures.ProcessPoolExecutor(max_workers=self.max_processes) as executor:
# Each parallel process calls the solve_od_cost_matrix() function with the od_inputs dictionary for the
# given origin and destination OID ranges.
jobs = {executor.submit(solve_od_cost_matrix, self.od_inputs, range): range for range in self.ranges}
# As each job is completed, add some logging information and store the results to post-process later
for future in futures.as_completed(jobs):
try:
# The OD cost matrix job returns a results dictionary. Retrieve it.
result = future.result()
except Exception: # pylint: disable=broad-except
# If we couldn't retrieve the result, some terrible error happened and the job errored.
# Note: This does not mean solve failed. It means some unexpected error was thrown. The most likely
# causes are:
# a) If you're calling a service, the service was temporarily down.
# b) You had a temporary file read/write or resource issue on your machine.
# c) If you're actively updating the code, you introduced an error.
# To make the tool more robust against temporary glitches, retry submitting the job up to the number
# of times designated in helpers.MAX_RETRIES. If the job is still erroring after that many retries,
# fail the entire tool run.
errs = traceback.format_exc().splitlines()
failed_range = jobs[future]
LOGGER.debug((
f"Failed to get results for OD chunk {failed_range} from the parallel process. Will retry up "
f"to {helpers.MAX_RETRIES} times. Errors: {errs}"
))
job_failed = True
num_retries = 0
while job_failed and num_retries < helpers.MAX_RETRIES:
num_retries += 1
try:
future = executor.submit(solve_od_cost_matrix, self.od_inputs, failed_range)
result = future.result()
job_failed = False
LOGGER.debug(f"OD chunk {failed_range} succeeded after {num_retries} retries.")
except Exception: # pylint: disable=broad-except
# Update exception info to the latest error
errs = traceback.format_exc().splitlines()
if job_failed:
# The job errored and did not succeed after retries. Fail the tool run because something
# terrible is happening.
LOGGER.debug(f"OD chunk {failed_range} continued to error after {num_retries} retries.")
LOGGER.error("Failed to get OD Cost Matrix result from parallel processing.")
errs = traceback.format_exc().splitlines()
for err in errs:
LOGGER.error(err)
raise

# If we got this far, the job completed successfully and we retrieved results.
completed_jobs += 1
LOGGER.info(
f"Finished OD Cost Matrix calculation {completed_jobs} of {self.total_jobs}.")

# Parse the results dictionary and store components for post-processing.
if result["solveSucceeded"]:
self.od_line_files.append(result["outputLines"])
else:
# Typically, a solve fails because no destinations were found for any of the origins in the chunk,
# and this is a perfectly legitimate failure. It is not an error. However, they may be other, less
# likely, reasons for solve failure. Write solve messages to the main GP message thread in debug
# mode only in case the user is having problems. The user can also check the individual OD log
# files.
LOGGER.debug(f"Solve failed for job id {result['jobId']}.")
LOGGER.debug(result["solveMessages"])
job_results = helpers.run_parallel_processes(
LOGGER, solve_od_cost_matrix, [self.od_inputs], self.ranges,
self.total_jobs, self.max_processes,
"Solving OD Cost Matrix", "OD Cost Matrix"
)

# Parse the results and store components for post-processing.
for result in job_results:
if result["solveSucceeded"]:
self.od_line_files.append(result["outputLines"])
else:
# Typically, a solve fails because no destinations were found for any of the origins in the chunk,
# and this is a perfectly legitimate failure. It is not an error. However, they may be other, less
# likely, reasons for solve failure. Write solve messages to the main GP message thread in debug
# mode only in case the user is having problems. The user can also check the individual OD log
# files.
LOGGER.debug(f"Solve failed for job id {result['jobId']}.")
LOGGER.debug(result["solveMessages"])

# Post-process outputs
if self.od_line_files:
Loading

0 comments on commit f6f016f

Please sign in to comment.