Skip to content

Commit

Permalink
Allow user to specify the checkpoints folder
Browse files Browse the repository at this point in the history
  • Loading branch information
roquelopez committed Apr 26, 2024
1 parent dafa7b2 commit aa640e5
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 50 deletions.
39 changes: 25 additions & 14 deletions alpha_automl/automl_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class BaseAutoML():

def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, save_checkpoint=False):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an BaseAutoML object.
Expand All @@ -39,6 +39,8 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
Expand All @@ -60,14 +62,13 @@ def __init__(self, time_bound=15, metric=None, split_strategy='holdout', time_bo
self.X = None
self.y = None
self.leaderboard = None
self.automl_manager = AutoMLManager(self.output_folder, time_bound, time_bound_run, task, num_cpus, verbose)
self.automl_manager = AutoMLManager(self.output_folder, checkpoints_folder, time_bound, time_bound_run, task, num_cpus, verbose)
self._start_method = get_start_method(start_mode)
set_start_method(self._start_method, force=True)
check_input_for_multiprocessing(self._start_method, self.scorer._score_func, 'metric')
check_input_for_multiprocessing(self._start_method, self.splitter, 'split strategy')
self.label_encoder = None
self.task_type = task
self.save_checkpoint = save_checkpoint

def fit(self, X, y):
"""
Expand All @@ -78,7 +79,7 @@ def fit(self, X, y):
"""
self.X = X
self.y = y
automl_hyperparams = {'new_primitives': self.new_primitives, 'save_checkpoint': self.save_checkpoint}
automl_hyperparams = {'new_primitives': self.new_primitives}
pipelines = []
start_time = datetime.datetime.utcnow()

Expand Down Expand Up @@ -298,7 +299,7 @@ class ClassifierBaseAutoML(BaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5, task=None,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, save_checkpoint=False):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.
Expand All @@ -313,13 +314,15 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose, save_checkpoint)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

self.label_encoder = LabelEncoder()

Expand Down Expand Up @@ -355,7 +358,7 @@ class AutoMLClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, save_checkpoint=False):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLClassifier object.
Expand All @@ -369,21 +372,23 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'CLASSIFICATION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose, save_checkpoint)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)


class AutoMLRegressor(BaseAutoML):

def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, save_checkpoint=False):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLRegressor object.
Expand All @@ -397,20 +402,22 @@ def __init__(self, time_bound=15, metric='mean_absolute_error', split_strategy='
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for splitting_strategy.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'REGRESSION'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose, save_checkpoint)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)


class AutoMLTimeSeries(BaseAutoML):
def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='timeseries', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None, target_column=None):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO, date_column=None):
"""
Create/instantiate an AutoMLTimeSeries object.
Expand All @@ -424,6 +431,8 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
:param metric_kwargs: Additional arguments for metric.
:param split_strategy_kwargs: Additional arguments for TimeSeriesSplit, E.g. n_splits and test_size(int).
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
Expand All @@ -434,7 +443,7 @@ def __init__(self, time_bound=15, metric='mean_squared_error', split_strategy='t
self.target_column = target_column

super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

def _column_parser(self, X):
cols = list(X.columns.values)
Expand All @@ -453,7 +462,7 @@ class AutoMLSemiSupervisedClassifier(ClassifierBaseAutoML):

def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdout', time_bound_run=5,
score_sorting='auto', metric_kwargs=None, split_strategy_kwargs=None, output_folder=None,
num_cpus=None, start_mode='auto', verbose=logging.INFO):
checkpoints_folder=None, num_cpus=None, start_mode='auto', verbose=logging.INFO):
"""
Create/instantiate an AutoMLSemiSupervisedClassifier object.
Expand All @@ -468,14 +477,16 @@ def __init__(self, time_bound=15, metric='accuracy_score', split_strategy='holdo
:param split_strategy_kwargs: Additional arguments for splitting_strategy. In SemiSupervised case, `n_splits`
and `test_size`(test proportion from 0 to 1) can be pass to the splitter.
:param output_folder: Path to the output directory. If it is None, create a temp folder automatically.
:param checkpoints_folder: Path to the directory to load and save the checkpoints. If it is None,
it will use the default checkpoints and save the new checkpoints in output_folder.
:param num_cpus: Number of CPUs to be used.
:param start_mode: The mode to start the multiprocessing library. It could be `auto`, `fork` or `spawn`.
:param verbose: The logs level.
"""

task = 'SEMISUPERVISED'
super().__init__(time_bound, metric, split_strategy, time_bound_run, task, score_sorting, metric_kwargs,
split_strategy_kwargs, output_folder, num_cpus, start_mode, verbose)
split_strategy_kwargs, output_folder, checkpoints_folder, num_cpus, start_mode, verbose)

if split_strategy_kwargs is None:
split_strategy_kwargs = {'test_size': 0.25}
Expand Down
7 changes: 4 additions & 3 deletions alpha_automl/automl_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@

class AutoMLManager():

def __init__(self, output_folder, time_bound, time_bound_run, task, num_cpus, verbose):
def __init__(self, output_folder, checkpoints_folder, time_bound, time_bound_run, task, num_cpus, verbose):
self.output_folder = output_folder
self.checkpoints_folder = checkpoints_folder
self.time_bound = time_bound * 60
self.time_bound_run = time_bound_run * 60
self.task = task
Expand Down Expand Up @@ -61,8 +62,8 @@ def _search_pipelines(self, automl_hyperparams):
need_rescoring = False

pipelines = search_pipelines_proc(X, y, self.scoring, internal_splitting_strategy, self.task,
self.time_bound, automl_hyperparams, metadata,
self.output_folder, self.verbose)
self.time_bound, automl_hyperparams, metadata, self.output_folder,
self.checkpoints_folder, self.verbose)

found_pipelines = 0

Expand Down
49 changes: 20 additions & 29 deletions alpha_automl/pipeline_search/agent_lab.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,8 @@

logger = logging.getLogger(__name__)

PATH_TO_CHECKPOINT = "/Users/rlopez/D3M/rllib/ppo_model"
PATH_TO_RESULT_JSON = "/Users/rlopez/D3M/rllib/result.json"
#PATH_TO_CHECKPOINT = "rllib/ppo_model"
#PATH_TO_RESULT_JSON = "rllib/result.json"


def pipeline_search_rllib(game, time_bound, save_checkpoint=False):
def pipeline_search_rllib(game, time_bound, checkpoint_load_folder, checkpoint_save_folder):
"""
Search for pipelines using Rllib
"""
Expand All @@ -31,18 +26,17 @@ def pipeline_search_rllib(game, time_bound, save_checkpoint=False):
logger.debug("[RlLib] Ready")

# load checkpoint or create a new one
algo = load_rllib_checkpoint(game, num_rollout_workers=7)
algo = load_rllib_checkpoint(game, checkpoint_load_folder, num_rollout_workers=7)
logger.debug("[RlLib] Create Algo object done")

# train model
train_rllib_model(algo, time_bound, save_checkpoint=save_checkpoint)
if save_checkpoint:
save_rllib_checkpoint(algo)
train_rllib_model(algo, time_bound, checkpoint_load_folder, checkpoint_save_folder)
save_rllib_checkpoint(algo, checkpoint_save_folder)
logger.debug("[RlLib] Done")
ray.shutdown()


def load_rllib_checkpoint(game, num_rollout_workers):
def load_rllib_checkpoint(game, checkpoint_load_folder, num_rollout_workers):
config = (
get_trainable_cls("PPO")
.get_default_config()
Expand All @@ -69,26 +63,27 @@ def load_rllib_checkpoint(game, num_rollout_workers):
logger.debug("[RlLib] Create Config done")

# Checking if the list is empty or not
if [f for f in os.listdir(PATH_TO_CHECKPOINT) if not f.startswith(".")] == []:
if [f for f in os.listdir(checkpoint_load_folder) if not f.startswith(".")] == []:
logger.debug("[RlLib] Cannot read RlLib checkpoint, create a new one.")
return config.build()
else:
algo = config.build()
weights = load_rllib_policy_weights()
weights = load_rllib_policy_weights(checkpoint_load_folder)

algo.set_weights(weights)
# Restore the old (checkpointed) state.
# algo.restore(PATH_TO_CHECKPOINT)
# checkpoint_info = get_checkpoint_info(PATH_TO_CHECKPOINT)
# Restore the old state.
# algo.restore(load_folder)
# checkpoint_info = get_checkpoint_info(load_folder)
return algo


def train_rllib_model(algo, time_bound, save_checkpoint=False):
def train_rllib_model(algo, time_bound, load_folder, checkpoint_save_folder):
timeout = time.time() + time_bound
result = algo.train()
last_best = result["episode_reward_mean"]
best_unchanged_iter = 1
logger.debug(pretty_print(result))

while True:
if (
time.time() > timeout
Expand All @@ -98,7 +93,7 @@ def train_rllib_model(algo, time_bound, save_checkpoint=False):
logger.debug(f"[RlLib] Train Timeout")
break

if save_checkpoint and [f for f in os.listdir(PATH_TO_CHECKPOINT) if not f.startswith(".")] != []:
if [f for f in os.listdir(load_folder) if not f.startswith(".")] != []:
weights = load_rllib_policy_weights()
algo.set_weights(weights)
result = algo.train()
Expand All @@ -107,24 +102,24 @@ def train_rllib_model(algo, time_bound, save_checkpoint=False):
if result["episode_reward_mean"] > last_best:
last_best = result["episode_reward_mean"]
best_unchanged_iter = 1
if save_checkpoint:
save_rllib_checkpoint(algo)
save_rllib_checkpoint(algo, checkpoint_save_folder)
else:
best_unchanged_iter += 1
algo.stop()


def load_rllib_policy_weights():
def load_rllib_policy_weights(checkpoint_load_folder):
logger.debug(f"[RlLib] Synchronizing model weights...")
policy = Policy.from_checkpoint(PATH_TO_CHECKPOINT)
policy = Policy.from_checkpoint(checkpoint_load_folder)
policy = policy['default_policy']
weights = policy.get_weights()

weights = {'default_policy': weights}

return weights

def save_rllib_checkpoint(algo):
save_result = algo.save(checkpoint_dir=PATH_TO_CHECKPOINT)
def save_rllib_checkpoint(algo, checkpoint_save_folder):
save_result = algo.save(checkpoint_dir=checkpoint_save_folder)
path_to_checkpoint = save_result.checkpoint.path

logger.debug(
Expand Down Expand Up @@ -178,10 +173,6 @@ def read_result_to_pipeline(builder, output_folder=None):


def generate_json_path(output_folder=None):
output_path = PATH_TO_RESULT_JSON
if output_folder is None:
output_path = PATH_TO_RESULT_JSON
else:
output_path = os.path.join(output_folder, "result.json")
output_path = os.path.join(output_folder, "result.json")

return output_path
12 changes: 8 additions & 4 deletions alpha_automl/pipeline_synthesis/setup_search.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import sys
import logging
from datetime import datetime
from os.path import join, dirname
from alpha_automl.grammar_loader import load_manual_grammar
from alpha_automl.pipeline_search.game import PipelineGame
from alpha_automl.pipeline_search.agent_lab import pipeline_search_rllib, dump_result_to_json, read_result_to_pipeline
from alpha_automl.pipeline_synthesis.pipeline_builder import BaseBuilder
from alpha_automl.scorer import score_pipeline
from alpha_automl.utils import hide_logs
from alpha_automl.utils import hide_logs, contain_checkpoints

logger = logging.getLogger(__name__)
DEFAULT_CHECKPOINT_PATH = join(dirname(__file__), '../resource/checkpoints/')


config = {
Expand Down Expand Up @@ -38,7 +40,8 @@ def signal_handler(queue, signum):
sys.exit(0)


def search_pipelines(X, y, scoring, splitting_strategy, task_name, time_bound, automl_hyperparams, metadata, output_folder, verbose):
def search_pipelines(X, y, scoring, splitting_strategy, task_name, time_bound, automl_hyperparams, metadata,
output_folder, checkpoints_folder, verbose):
# signal.signal(signal.SIGTERM, lambda signum, frame: signal_handler(queue, signum))
hide_logs(verbose) # Hide logs here too, since multiprocessing has some issues with loggers

Expand Down Expand Up @@ -73,7 +76,6 @@ def evaluate_pipeline(primitives):
include_primitives = automl_hyperparams['include_primitives']
exclude_primitives = automl_hyperparams['exclude_primitives']
new_primitives = automl_hyperparams['new_primitives']
save_checkpoint = automl_hyperparams['save_checkpoint']
use_imputer = metadata['missing_values']
nonnumeric_columns = metadata['nonnumeric_columns']

Expand All @@ -89,8 +91,10 @@ def evaluate_pipeline(primitives):

metric = scoring._score_func.__name__
config_updated = update_config(task_name, metric, grammar, metadata)
checkpoint_load_folder = checkpoints_folder if contain_checkpoints(checkpoints_folder) else DEFAULT_CHECKPOINT_PATH
checkpoint_save_folder = checkpoints_folder if checkpoints_folder is not None else output_folder
game = PipelineGame(config_updated, evaluate_pipeline)
pipeline_search_rllib(game, time_bound, save_checkpoint=save_checkpoint)
pipeline_search_rllib(game, time_bound, checkpoint_load_folder, checkpoint_save_folder)
logger.debug('Search completed')
results = read_result_to_pipeline(builder, output_folder)

Expand Down
10 changes: 10 additions & 0 deletions alpha_automl/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,16 @@ def check_input_for_multiprocessing(start_method, callable_input, input_type):
f'from my_external_module import {object_name}"')


def contain_checkpoints(folder_path):
if folder_path is None:
return False

if False: # Add logic here when the folder contains checkpoint files
return True

return False


class SemiSupervisedSplitter:
"""
SemiSupervisedSplitter makes sure that unlabeled rows not being
Expand Down

0 comments on commit aa640e5

Please sign in to comment.