From 507e39737d0c48fbb84a0ca3c542e91057a0b061 Mon Sep 17 00:00:00 2001 From: Melinda Morang Date: Wed, 30 Aug 2023 15:08:47 -0700 Subject: [PATCH] Use a shared function to launch and manage parallel processes --- helpers.py | 83 ++++++++++++++++++++++++++++++ parallel_calculate_locations.py | 79 +++++----------------------- parallel_odcm.py | 89 ++++++++------------------------ parallel_route_pairs.py | 91 ++++++++------------------------- unittests/test_parallel_odcm.py | 4 +- 5 files changed, 138 insertions(+), 208 deletions(-) diff --git a/helpers.py b/helpers.py index 537c609..fa20196 100644 --- a/helpers.py +++ b/helpers.py @@ -21,6 +21,7 @@ import traceback import logging import subprocess +from concurrent import futures import arcpy arc_license = arcpy.ProductInfo() @@ -661,6 +662,88 @@ def run_gp_tool(log_to_use, tool, tool_args=None, tool_kwargs=None): return result +def run_parallel_processes( + logger, function_to_call, static_args, chunks, total_jobs, max_processes, + msg_intro_verb, msg_process_str +): + """Launch and manage parallel processes and return a list of process results. + + Args: + logger (logging.logger): Logger to use for the parallelized process + function_to_call (function): Function called in each parallelized job + static_args (list): List of values used as static arguments to the function_to_call + chunks (iterator): Iterator with values that will be passed one at a time to the function_to_call, with each + value being one parallelized chunk. + total_jobs (int): Total number of jobs that will be run. Used in messaging. + max_processes (int): Maximum number of parallel processes allowed. + msg_intro_verb (str): Text to include in the intro message f"{msg_intro_verb} in parallel..." + msg_process_str (_type_): Text to include in messages representing whatever is being parallelized. + + Returns: + list: List of returned values from the parallel processes. + """ + logger.info(f"{msg_intro_verb} in parallel ({total_jobs} chunks)...") + completed_jobs = 0 # Track the number of jobs completed so far to use in logging + job_results = [] + # Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that call the function + with futures.ProcessPoolExecutor(max_workers=max_processes) as executor: + # Each parallel process calls the designated function with the designated static inputs and a unique chunk + jobs = {executor.submit( + function_to_call, chunk, *static_args): chunk for chunk in chunks} + # As each job is completed, add some logging information and store the results to post-process later + for future in futures.as_completed(jobs): + try: + # Retrieve the results returned by the process + result = future.result() + except Exception: # pylint: disable=broad-except + # If we couldn't retrieve the result, some terrible error happened and the job errored. + # Note: For processes that do network analysis workflows, this does not mean solve failed. + # It means some unexpected error was thrown. The most likely + # causes are: + # a) If you're calling a service, the service was temporarily down. + # b) You had a temporary file read/write or resource issue on your machine. + # c) If you're actively updating the code, you introduced an error. + # To make the tool more robust against temporary glitches, retry submitting the job up to the number + # of times designated in MAX_RETRIES. If the job is still erroring after that many + # retries, fail the entire tool run. + errs = traceback.format_exc().splitlines() + failed_range = jobs[future] + logger.debug(( + f"Failed to get results for {msg_process_str} chunk {failed_range} from the parallel process. " + f"Will retry up to {MAX_RETRIES} times. Errors: {errs}" + )) + job_failed = True + num_retries = 0 + while job_failed and num_retries < MAX_RETRIES: + num_retries += 1 + try: + future = executor.submit(function_to_call, failed_range, *static_args) + result = future.result() + job_failed = False + logger.debug(f"{msg_process_str} chunk {failed_range} succeeded after {num_retries} retries.") + except Exception: # pylint: disable=broad-except + # Update exception info to the latest error + errs = traceback.format_exc().splitlines() + if job_failed: + # The job errored and did not succeed after retries. Fail the tool run because something + # terrible is happening. + logger.debug( + f"{msg_process_str} chunk {failed_range} continued to error after {num_retries} retries.") + logger.error(f"Failed to get {msg_process_str} result from parallel processing.") + errs = traceback.format_exc().splitlines() + for err in errs: + logger.error(err) + raise + + # If we got this far, the job completed successfully and we retrieved results. + completed_jobs += 1 + logger.info( + f"Finished {msg_process_str} {completed_jobs} of {total_jobs}.") + job_results.append(result) + + return job_results + + class PrecalculateLocationsMixin: # pylint:disable = too-few-public-methods """Used to precalculate network locations either directly or calling the parallelized version.""" diff --git a/parallel_calculate_locations.py b/parallel_calculate_locations.py index 772e956..a4742db 100644 --- a/parallel_calculate_locations.py +++ b/parallel_calculate_locations.py @@ -31,7 +31,6 @@ limitations under the License. """ # pylint: disable=logging-fstring-interpolation -from concurrent import futures import os import uuid import logging @@ -142,12 +141,12 @@ def calculate_locations(self, oid_range): self.job_result["oidRange"] = tuple(oid_range) -def calculate_locations_for_chunk(calc_locs_settings, chunk): +def calculate_locations_for_chunk(chunk, calc_locs_settings): """Calculate locations for a range of OIDs in the input dataset. Args: - calc_locs_settings (dict): Dictionary of kwargs for the LocationCalculator class. chunk (list): OID range to calculate locations for. Specified as a list of [start range, end range], inclusive. + calc_locs_settings (dict): Dictionary of kwargs for the LocationCalculator class. Returns: dict: Dictionary of job results for the chunk @@ -217,70 +216,16 @@ def __init__( # pylint: disable=too-many-locals, too-many-arguments def calc_locs_in_parallel(self): """Calculate locations in parallel.""" - total_jobs = len(self.ranges) - LOGGER.info(f"Beginning parallel Calculate Locations ({total_jobs} chunks)") - completed_jobs = 0 # Track the number of jobs completed so far to use in logging - # Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that calculate chunks of - # locations - with futures.ProcessPoolExecutor(max_workers=self.max_processes) as executor: - # Each parallel process calls the calculate_locations_for_chunk() function for the given range of OIDs - # in the input dataset - jobs = {executor.submit( - calculate_locations_for_chunk, self.calc_locs_inputs, chunk - ): chunk for chunk in self.ranges} - # As each job is completed, add some logging information and store the results to post-process later - for future in futures.as_completed(jobs): - try: - # The job returns a results dictionary. Retrieve it. - result = future.result() - except Exception: # pylint: disable=broad-except - # If we couldn't retrieve the result, some terrible error happened and the job errored. - # Note: This does not mean solve failed. It means some unexpected error was thrown. The most likely - # causes are: - # a) If you're calling a service, the service was temporarily down. - # b) You had a temporary file read/write or resource issue on your machine. - # c) If you're actively updating the code, you introduced an error. - # To make the tool more robust against temporary glitches, retry submitting the job up to the number - # of times designated in helpers.MAX_RETRIES. If the job is still erroring after that many retries, - # fail the entire tool run. - errs = traceback.format_exc().splitlines() - failed_range = jobs[future] - LOGGER.debug(( - f"Failed to get results for Calculate Locations chunk {failed_range} from the parallel process." - f" Will retry up to {helpers.MAX_RETRIES} times. Errors: {errs}" - )) - job_failed = True - num_retries = 0 - while job_failed and num_retries < helpers.MAX_RETRIES: - num_retries += 1 - try: - future = executor.submit(calculate_locations_for_chunk, self.calc_locs_inputs, failed_range) - result = future.result() - job_failed = False - LOGGER.debug( - f"Calculate Locations chunk {failed_range} succeeded after {num_retries} retries.") - except Exception: # pylint: disable=broad-except - # Update exception info to the latest error - errs = traceback.format_exc().splitlines() - if job_failed: - # The job errored and did not succeed after retries. Fail the tool run because something - # terrible is happening. - LOGGER.debug( - f"Calculate Locations chunk {failed_range} continued to error after {num_retries} retries.") - LOGGER.error("Failed to get Calculate Locations result from parallel processing.") - errs = traceback.format_exc().splitlines() - for err in errs: - LOGGER.error(err) - raise - - # If we got this far, the job completed successfully and we retrieved results. - completed_jobs += 1 - LOGGER.info( - f"Finished Calculate Locations chunk {completed_jobs} of {total_jobs}.") - - # Parse the results dictionary and store components for post-processing. - # Store the ranges as dictionary keys to facilitate sorting. - self.temp_out_fcs[result["oidRange"]] = result["outputFC"] + # Calculate locations in parallel + job_results = helpers.run_parallel_processes( + LOGGER, calculate_locations_for_chunk, [self.calc_locs_inputs], self.ranges, + len(self.ranges), self.max_processes, + "Calculating locations", "Calculate Locations" + ) + for result in job_results: + # Parse the results dictionary and store components for post-processing. + # Store the ranges as dictionary keys to facilitate sorting. + self.temp_out_fcs[result["oidRange"]] = result["outputFC"] # Rejoin the chunked feature classes into one. LOGGER.info("Rejoining chunked data...") diff --git a/parallel_odcm.py b/parallel_odcm.py index 3356d54..cb00a7e 100644 --- a/parallel_odcm.py +++ b/parallel_odcm.py @@ -24,7 +24,6 @@ limitations under the License. """ # pylint: disable=logging-fstring-interpolation -from concurrent import futures import os import uuid import logging @@ -623,14 +622,14 @@ def _determine_if_travel_mode_time_based(self): self.optimized_field_name = "Total_Distance" -def solve_od_cost_matrix(inputs, chunk): +def solve_od_cost_matrix(chunk, inputs): """Solve an OD Cost Matrix analysis for the given inputs for the given chunk of ObjectIDs. Args: - inputs (dict): Dictionary of keyword inputs suitable for initializing the ODCostMatrix class chunk (list): Represents the ObjectID ranges to select from the origins and destinations when solving the OD Cost Matrix. For example, [[1, 1000], [4001, 5000]] means use origin OIDs 1-1000 and destination OIDs 4001-5000. + inputs (dict): Dictionary of keyword inputs suitable for initializing the ODCostMatrix class Returns: dict: Dictionary of results from the ODCostMatrix class @@ -786,72 +785,24 @@ def solve_od_in_parallel(self): self.optimized_cost_field = self._validate_od_settings() # Compute OD cost matrix in parallel - LOGGER.info(f"Beginning parallelized OD Cost Matrix solves ({self.total_jobs} chunks)") - completed_jobs = 0 # Track the number of jobs completed so far to use in logging - # Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that solve the OD cost matrices - with futures.ProcessPoolExecutor(max_workers=self.max_processes) as executor: - # Each parallel process calls the solve_od_cost_matrix() function with the od_inputs dictionary for the - # given origin and destination OID ranges. - jobs = {executor.submit(solve_od_cost_matrix, self.od_inputs, range): range for range in self.ranges} - # As each job is completed, add some logging information and store the results to post-process later - for future in futures.as_completed(jobs): - try: - # The OD cost matrix job returns a results dictionary. Retrieve it. - result = future.result() - except Exception: # pylint: disable=broad-except - # If we couldn't retrieve the result, some terrible error happened and the job errored. - # Note: This does not mean solve failed. It means some unexpected error was thrown. The most likely - # causes are: - # a) If you're calling a service, the service was temporarily down. - # b) You had a temporary file read/write or resource issue on your machine. - # c) If you're actively updating the code, you introduced an error. - # To make the tool more robust against temporary glitches, retry submitting the job up to the number - # of times designated in helpers.MAX_RETRIES. If the job is still erroring after that many retries, - # fail the entire tool run. - errs = traceback.format_exc().splitlines() - failed_range = jobs[future] - LOGGER.debug(( - f"Failed to get results for OD chunk {failed_range} from the parallel process. Will retry up " - f"to {helpers.MAX_RETRIES} times. Errors: {errs}" - )) - job_failed = True - num_retries = 0 - while job_failed and num_retries < helpers.MAX_RETRIES: - num_retries += 1 - try: - future = executor.submit(solve_od_cost_matrix, self.od_inputs, failed_range) - result = future.result() - job_failed = False - LOGGER.debug(f"OD chunk {failed_range} succeeded after {num_retries} retries.") - except Exception: # pylint: disable=broad-except - # Update exception info to the latest error - errs = traceback.format_exc().splitlines() - if job_failed: - # The job errored and did not succeed after retries. Fail the tool run because something - # terrible is happening. - LOGGER.debug(f"OD chunk {failed_range} continued to error after {num_retries} retries.") - LOGGER.error("Failed to get OD Cost Matrix result from parallel processing.") - errs = traceback.format_exc().splitlines() - for err in errs: - LOGGER.error(err) - raise - - # If we got this far, the job completed successfully and we retrieved results. - completed_jobs += 1 - LOGGER.info( - f"Finished OD Cost Matrix calculation {completed_jobs} of {self.total_jobs}.") - - # Parse the results dictionary and store components for post-processing. - if result["solveSucceeded"]: - self.od_line_files.append(result["outputLines"]) - else: - # Typically, a solve fails because no destinations were found for any of the origins in the chunk, - # and this is a perfectly legitimate failure. It is not an error. However, they may be other, less - # likely, reasons for solve failure. Write solve messages to the main GP message thread in debug - # mode only in case the user is having problems. The user can also check the individual OD log - # files. - LOGGER.debug(f"Solve failed for job id {result['jobId']}.") - LOGGER.debug(result["solveMessages"]) + job_results = helpers.run_parallel_processes( + LOGGER, solve_od_cost_matrix, [self.od_inputs], self.ranges, + self.total_jobs, self.max_processes, + "Solving OD Cost Matrix", "OD Cost Matrix" + ) + + # Parse the results and store components for post-processing. + for result in job_results: + if result["solveSucceeded"]: + self.od_line_files.append(result["outputLines"]) + else: + # Typically, a solve fails because no destinations were found for any of the origins in the chunk, + # and this is a perfectly legitimate failure. It is not an error. However, they may be other, less + # likely, reasons for solve failure. Write solve messages to the main GP message thread in debug + # mode only in case the user is having problems. The user can also check the individual OD log + # files. + LOGGER.debug(f"Solve failed for job id {result['jobId']}.") + LOGGER.debug(result["solveMessages"]) # Post-process outputs if self.od_line_files: diff --git a/parallel_route_pairs.py b/parallel_route_pairs.py index f946e39..88a9bcb 100644 --- a/parallel_route_pairs.py +++ b/parallel_route_pairs.py @@ -27,7 +27,6 @@ limitations under the License. """ # pylint: disable=logging-fstring-interpolation -from concurrent import futures import os import logging import shutil @@ -518,13 +517,13 @@ def _export_to_feature_class(self, chunk_definition): self.job_result["outputRoutes"] = output_routes -def solve_route(inputs, chunk): +def solve_route(chunk, inputs): """Solve a Route analysis for the given inputs for the given chunk of preassigned OD pairs. Args: - inputs (dict): Dictionary of keyword inputs suitable for initializing the Route class chunk (list): For one-to-one, the ObjectID range to select from the input origins. For many-to-many, a list of [chunk starting row number, chunk size]. + inputs (dict): Dictionary of keyword inputs suitable for initializing the Route class Returns: dict: Dictionary of results from the Route class @@ -724,73 +723,25 @@ def solve_route_in_parallel(self): # Check if the input origins and destinations have any fields we should use in the route analysis self._populate_input_data_transfer_fields() - # Compute Route in parallel - LOGGER.info(f"Beginning parallelized Route solves ({self.total_jobs} chunks)") - completed_jobs = 0 # Track the number of jobs completed so far to use in logging - # Use the concurrent.futures ProcessPoolExecutor to spin up parallel processes that solve the routes - with futures.ProcessPoolExecutor(max_workers=self.max_processes) as executor: - # Each parallel process calls the solve_route() function with the rt_inputs dictionary for the - # given origin ranges and their assigned destinations. - jobs = {executor.submit(solve_route, self.rt_inputs, range): range for range in self.chunks} - # As each job is completed, add some logging information and store the results to post-process later - for future in futures.as_completed(jobs): - try: - # The Route job returns a results dictionary. Retrieve it. - result = future.result() - except Exception: # pylint: disable=broad-except - # If we couldn't retrieve the result, some terrible error happened and the job errored. - # Note: This does not mean solve failed. It means some unexpected error was thrown. The most likely - # causes are: - # a) If you're calling a service, the service was temporarily down. - # b) You had a temporary file read/write or resource issue on your machine. - # c) If you're actively updating the code, you introduced an error. - # To make the tool more robust against temporary glitches, retry submitting the job up to the number - # of times designated in helpers.MAX_RETRIES. If the job is still erroring after that many retries, - # fail the entire tool run. - errs = traceback.format_exc().splitlines() - failed_range = jobs[future] - LOGGER.debug(( - f"Failed to get results for Route chunk {failed_range} from the parallel process. Will retry " - f"up to {helpers.MAX_RETRIES} times. Errors: {errs}" - )) - job_failed = True - num_retries = 0 - while job_failed and num_retries < helpers.MAX_RETRIES: - num_retries += 1 - try: - future = executor.submit(solve_route, self.rt_inputs, failed_range) - result = future.result() - job_failed = False - LOGGER.debug(f"Route chunk {failed_range} succeeded after {num_retries} retries.") - except Exception: # pylint: disable=broad-except - # Update exception info to the latest error - errs = traceback.format_exc().splitlines() - if job_failed: - # The job errored and did not succeed after retries. Fail the tool run because something - # terrible is happening. - LOGGER.debug(f"Route chunk {failed_range} continued to error after {num_retries} retries.") - LOGGER.error("Failed to get Route result from parallel processing.") - errs = traceback.format_exc().splitlines() - for err in errs: - LOGGER.error(err) - raise - - # If we got this far, the job completed successfully and we retrieved results. - completed_jobs += 1 - LOGGER.info( - f"Finished Route calculation {completed_jobs} of {self.total_jobs}.") - - # Parse the results dictionary and store components for post-processing. - if result["solveSucceeded"]: - self.route_fcs.append(result["outputRoutes"]) - else: - # Typically, a solve fails because no destinations were found for any of the origins in the chunk, - # and this is a perfectly legitimate failure. It is not an error. However, they may be other, less - # likely, reasons for solve failure. Write solve messages to the main GP message thread in debug - # mode only in case the user is having problems. The user can also check the individual OD log - # files. - LOGGER.debug(f"Solve failed for job id {result['jobId']}.") - LOGGER.debug(result["solveMessages"]) + # Compute Routes in parallel + job_results = helpers.run_parallel_processes( + LOGGER, solve_route, [self.rt_inputs], self.chunks, + self.total_jobs, self.max_processes, + "Solving Routes", "Route" + ) + + # Parse the results and store components for post-processing. + for result in job_results: + if result["solveSucceeded"]: + self.route_fcs.append(result["outputRoutes"]) + else: + # Typically, a solve fails because no destinations were found for any of the origins in the chunk, + # and this is a perfectly legitimate failure. It is not an error. However, they may be other, less + # likely, reasons for solve failure. Write solve messages to the main GP message thread in debug + # mode only in case the user is having problems. The user can also check the individual OD log + # files. + LOGGER.debug(f"Solve failed for job id {result['jobId']}.") + LOGGER.debug(result["solveMessages"]) # Post-process outputs if self.route_fcs: diff --git a/unittests/test_parallel_odcm.py b/unittests/test_parallel_odcm.py index 0362173..dc7f34f 100644 --- a/unittests/test_parallel_odcm.py +++ b/unittests/test_parallel_odcm.py @@ -1,6 +1,6 @@ """Unit tests for the parallel_.py module. -Copyright 2022 Esri +Copyright 2023 Esri Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at @@ -293,7 +293,7 @@ def test_ODCostMatrix_solve_service_csv(self): def test_solve_od_cost_matrix(self): """Test the solve_od_cost_matrix function.""" - result = parallel_odcm.solve_od_cost_matrix(self.od_args, [[1, 2], [8, 12]]) + result = parallel_odcm.solve_od_cost_matrix([[1, 2], [8, 12]], self.od_args) # Check results self.assertIsInstance(result, dict) self.assertTrue(os.path.exists(result["logFile"]), "Log file does not exist.")