From 19e4f8b2d06c8cdfbceba5db97aecbb5c3c4d7cc Mon Sep 17 00:00:00 2001 From: EdenWuyifan Date: Fri, 26 Apr 2024 17:15:39 -0400 Subject: [PATCH] Refactor output folder --- .../pipeline_search/agent_environment.py | 29 ++- alpha_automl/pipeline_search/agent_lab.py | 77 +++++--- .../pipeline_synthesis/setup_search.py | 185 +++++++++++------- alpha_automl/utils.py | 10 - 4 files changed, 178 insertions(+), 123 deletions(-) diff --git a/alpha_automl/pipeline_search/agent_environment.py b/alpha_automl/pipeline_search/agent_environment.py index 25f3b4da..ca90f5c3 100644 --- a/alpha_automl/pipeline_search/agent_environment.py +++ b/alpha_automl/pipeline_search/agent_environment.py @@ -1,5 +1,6 @@ import logging import random + import gymnasium as gym import numpy as np from gymnasium.spaces import Box, Dict, Discrete @@ -40,7 +41,6 @@ def __init__(self, config: EnvContext): self.action_offsets = self.generate_action_offsets() self.action_space = Discrete(self.max_actions) - def reset(self, *, seed=None, options=None): # init number of steps self.num_steps = 0 @@ -54,11 +54,11 @@ def reset(self, *, seed=None, options=None): def step(self, action): curr_step = self.step_stack.pop() - offseted_action = self.action_offsets[curr_step]+action + offseted_action = self.action_offsets[curr_step] + action valid_action_size = self.action_spaces[curr_step] # Check the action is illegal valid_moves = self.game.getValidMoves(self.board) - if action >= valid_action_size or valid_moves[offseted_action-1] != 1: + if action >= valid_action_size or valid_moves[offseted_action - 1] != 1: return ( {"board": np.array(self.board).astype(np.uint8)}, -1, @@ -78,19 +78,19 @@ def step(self, action): False, {}, ) - if non_terminals_moves[0] != "E" and non_terminals_moves[0].upper() == non_terminals_moves[0]: + if ( + non_terminals_moves[0] != "E" + and non_terminals_moves[0].upper() == non_terminals_moves[0] + ): self.step_stack.extend(non_terminals_moves[::-1]) - # update number of steps self.num_steps += 1 # update board with new action # print(f"action: {action}\n board: {self.board}") - self.board = self.game.getNextState(self.board, offseted_action-1) + self.board = self.game.getNextState(self.board, offseted_action - 1) - if self.num_steps > 9: - logger.debug(f"[YFW]================={self.board[self.game.m:]}") # reward: win(1) - pipeline score, not end(0) - 0, bad(2) - 0 reward = 0 game_end = self.game.getGameEnded(self.board) @@ -118,7 +118,7 @@ def step(self, action): # else: # split_move = move_string.split("->") # non_terminals_moves = move_string.split("->")[1].strip().split(" ") - + # if split_move[0].strip() == "ENSEMBLER": # if "E" in non_terminals_moves: # rewards = 5 @@ -126,8 +126,6 @@ def step(self, action): # rewards = 5 - len(non_terminals_moves) # else: # rewards = random.uniform(0, 1) - - # done & truncated truncated = self.num_steps >= 20 @@ -153,21 +151,20 @@ def generate_action_spaces(self): action_spaces = {} for action in self.game.grammar["RULES"].values(): move_type, non_terminals_moves = self.extract_action_details(action) - + if move_type not in action_spaces: action_spaces[move_type] = 1 else: action_spaces[move_type] += 1 - + return action_spaces def generate_action_offsets(self): action_offsets = {} for action in self.game.grammar["RULES"].values(): move_type, non_terminals_moves = self.extract_action_details(action) - + if move_type not in action_offsets: action_offsets[move_type] = action - + return action_offsets - \ No newline at end of file diff --git a/alpha_automl/pipeline_search/agent_lab.py b/alpha_automl/pipeline_search/agent_lab.py index ede45314..122311ca 100644 --- a/alpha_automl/pipeline_search/agent_lab.py +++ b/alpha_automl/pipeline_search/agent_lab.py @@ -2,22 +2,21 @@ import logging import os import time -import json from datetime import datetime import ray +from alpha_automl.pipeline_search.agent_environment import AutoMLEnv from ray.rllib.policy import Policy from ray.rllib.utils.checkpoints import get_checkpoint_info from ray.tune.logger import pretty_print from ray.tune.registry import get_trainable_cls -from ray import tune - -from alpha_automl.pipeline_search.agent_environment import AutoMLEnv logger = logging.getLogger(__name__) -def pipeline_search_rllib(game, time_bound, checkpoint_load_folder, checkpoint_save_folder): +def pipeline_search_rllib( + game, time_bound, checkpoint_load_folder, checkpoint_save_folder +): """ Search for pipelines using Rllib """ @@ -31,7 +30,6 @@ def pipeline_search_rllib(game, time_bound, checkpoint_load_folder, checkpoint_s # train model train_rllib_model(algo, time_bound, checkpoint_load_folder, checkpoint_save_folder) - save_rllib_checkpoint(algo, checkpoint_save_folder) logger.debug("[RlLib] Done") ray.shutdown() @@ -63,13 +61,13 @@ def load_rllib_checkpoint(game, checkpoint_load_folder, num_rollout_workers): logger.debug("[RlLib] Create Config done") # Checking if the list is empty or not - if [f for f in os.listdir(checkpoint_load_folder) if not f.startswith(".")] == []: + if contain_checkpoints(checkpoint_load_folder): logger.debug("[RlLib] Cannot read RlLib checkpoint, create a new one.") return config.build() else: algo = config.build() weights = load_rllib_policy_weights(checkpoint_load_folder) - + algo.set_weights(weights) # Restore the old state. # algo.restore(load_folder) @@ -77,7 +75,7 @@ def load_rllib_checkpoint(game, checkpoint_load_folder, num_rollout_workers): return algo -def train_rllib_model(algo, time_bound, load_folder, checkpoint_save_folder): +def train_rllib_model(algo, time_bound, checkpoint_load_folder, checkpoint_save_folder): timeout = time.time() + time_bound result = algo.train() last_best = result["episode_reward_mean"] @@ -92,9 +90,12 @@ def train_rllib_model(algo, time_bound, load_folder, checkpoint_save_folder): ): logger.debug(f"[RlLib] Train Timeout") break - - if [f for f in os.listdir(load_folder) if not f.startswith(".")] != []: - weights = load_rllib_policy_weights() + + if contain_checkpoints(checkpoint_save_folder): + weights = load_rllib_policy_weights(checkpoint_save_folder) + algo.set_weights(weights) + elif contain_checkpoints(checkpoint_load_folder): + weights = load_rllib_policy_weights(checkpoint_load_folder) algo.set_weights(weights) result = algo.train() logger.debug(pretty_print(result)) @@ -108,16 +109,17 @@ def train_rllib_model(algo, time_bound, load_folder, checkpoint_save_folder): algo.stop() -def load_rllib_policy_weights(checkpoint_load_folder): +def load_rllib_policy_weights(checkpoint_folder): logger.debug(f"[RlLib] Synchronizing model weights...") - policy = Policy.from_checkpoint(checkpoint_load_folder) - policy = policy['default_policy'] + policy = Policy.from_checkpoint(checkpoint_folder) + policy = policy["default_policy"] weights = policy.get_weights() - weights = {'default_policy': weights} + weights = {"default_policy": weights} return weights + def save_rllib_checkpoint(algo, checkpoint_save_folder): save_result = algo.save(checkpoint_dir=checkpoint_save_folder) path_to_checkpoint = save_result.checkpoint.path @@ -131,15 +133,14 @@ def dump_result_to_json(primitives, task_start, score, output_folder=None): output_path = generate_json_path(output_folder) # Read JSON data from input file if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: - with open(output_path, 'w') as f: + with open(output_path, "w") as f: json.dump({}, f) - with open(output_path, 'r') as f: + with open(output_path, "r") as f: data = json.load(f) - - + timestamp = str(datetime.now() - task_start) # strftime("%Y-%m-%d %H:%M:%S") - + # Check for duplicate elements if primitives in data.values(): return @@ -152,13 +153,10 @@ def dump_result_to_json(primitives, task_start, score, output_folder=None): def read_result_to_pipeline(builder, output_folder=None): output_path = generate_json_path(output_folder) - + pipelines = [] # Read JSON data from input file - if ( - not os.path.exists(output_path) - or os.path.getsize(output_path) == 0 - ): + if not os.path.exists(output_path) or os.path.getsize(output_path) == 0: return [] with open(output_path, "r") as f: data = json.load(f) @@ -168,11 +166,34 @@ def read_result_to_pipeline(builder, output_folder=None): pipeline = builder.make_pipeline(primitives) if pipeline: pipelines.append(pipeline) - + return pipelines def generate_json_path(output_folder=None): output_path = os.path.join(output_folder, "result.json") - + return output_path + + +def contain_checkpoints(folder_path): + if folder_path is None: + return False + + file_list = os.listdir(folder_path) + + if [f for f in file_list if not f.startswith(".")] == []: + return False + + if ( + "algorithm_state.pkl" in file_list + and "policies" in file_list + and "rllib_checkpoint.json" in file_list + ): + return True + else: + logger.info( + f"[RlLib] Checkpoint folder {folder_path} does not contain all necessary files, files: {file_list}." + ) + + return False diff --git a/alpha_automl/pipeline_synthesis/setup_search.py b/alpha_automl/pipeline_synthesis/setup_search.py index eabb6a61..761da08e 100644 --- a/alpha_automl/pipeline_synthesis/setup_search.py +++ b/alpha_automl/pipeline_synthesis/setup_search.py @@ -1,85 +1,100 @@ -import sys import logging +import sys from datetime import datetime -from os.path import join, dirname +from os.path import dirname, join + from alpha_automl.grammar_loader import load_manual_grammar +from alpha_automl.pipeline_search.agent_lab import (contain_checkpoints, + dump_result_to_json, + pipeline_search_rllib, + read_result_to_pipeline) from alpha_automl.pipeline_search.game import PipelineGame -from alpha_automl.pipeline_search.agent_lab import pipeline_search_rllib, dump_result_to_json, read_result_to_pipeline from alpha_automl.pipeline_synthesis.pipeline_builder import BaseBuilder from alpha_automl.scorer import score_pipeline -from alpha_automl.utils import hide_logs, contain_checkpoints +from alpha_automl.utils import hide_logs logger = logging.getLogger(__name__) -DEFAULT_CHECKPOINT_PATH = join(dirname(__file__), '../resource/checkpoints/') +DEFAULT_CHECKPOINT_PATH = join(dirname(__file__), "../resource/checkpoints/") config = { - 'PROBLEM_TYPES': { - 'CLASSIFICATION': 1, - 'REGRESSION': 2, - 'CLUSTERING': 3, - 'TIME_SERIES_FORECAST': 4, - 'SEMISUPERVISED': 5, - 'NA': 6 + "PROBLEM_TYPES": { + "CLASSIFICATION": 1, + "REGRESSION": 2, + "CLUSTERING": 3, + "TIME_SERIES_FORECAST": 4, + "SEMISUPERVISED": 5, + "NA": 6, }, - 'DATA_TYPES': { - 'TABULAR': 1, - 'TEXT': 2, - 'IMAGE': 3, - 'VIDEO': 4, - 'MULTIMODAL': 5 - }, - 'PIPELINE_SIZE': 10 + "DATA_TYPES": {"TABULAR": 1, "TEXT": 2, "IMAGE": 3, "VIDEO": 4, "MULTIMODAL": 5}, + "PIPELINE_SIZE": 10, } def signal_handler(queue, signum): - logger.debug(f'Receiving signal {signum}, terminating process') - queue.append('DONE') + logger.debug(f"Receiving signal {signum}, terminating process") + queue.append("DONE") # TODO: Should it save the last status of the NN model? sys.exit(0) -def search_pipelines(X, y, scoring, splitting_strategy, task_name, time_bound, automl_hyperparams, metadata, - output_folder, checkpoints_folder, verbose): +def search_pipelines( + X, + y, + scoring, + splitting_strategy, + task_name, + time_bound, + automl_hyperparams, + metadata, + output_folder, + checkpoints_folder, + verbose, +): # signal.signal(signal.SIGTERM, lambda signum, frame: signal_handler(queue, signum)) - hide_logs(verbose) # Hide logs here too, since multiprocessing has some issues with loggers + hide_logs( + verbose + ) # Hide logs here too, since multiprocessing has some issues with loggers builder = BaseBuilder(metadata, automl_hyperparams) all_primitives = builder.all_primitives ensemble_pipelines_hash = set() - + task_start = datetime.now() def evaluate_pipeline(primitives): - has_repeated_classifiers = check_repeated_classifiers(primitives, all_primitives, ensemble_pipelines_hash) + has_repeated_classifiers = check_repeated_classifiers( + primitives, all_primitives, ensemble_pipelines_hash + ) if has_repeated_classifiers: - logger.info('Repeated classifiers detected in ensembles, ignoring pipeline') + logger.info("Repeated classifiers detected in ensembles, ignoring pipeline") return None pipeline = builder.make_pipeline(primitives) score = None if pipeline is not None: - alphaautoml_pipeline = score_pipeline(pipeline, X, y, scoring, splitting_strategy, task_name, verbose) + alphaautoml_pipeline = score_pipeline( + pipeline, X, y, scoring, splitting_strategy, task_name, verbose + ) if alphaautoml_pipeline is not None: score = alphaautoml_pipeline.get_score() if score is not None: - dump_result_to_json(primitives, task_start, score, output_folder) + dump_result_to_json(primitives, task_start, score, output_folder) return score if task_name is None: - task_name = 'NA' + task_name = "NA" - task_name_id = task_name + '_TASK' - include_primitives = automl_hyperparams['include_primitives'] - exclude_primitives = automl_hyperparams['exclude_primitives'] - new_primitives = automl_hyperparams['new_primitives'] - use_imputer = metadata['missing_values'] - nonnumeric_columns = metadata['nonnumeric_columns'] + task_name_id = task_name + "_TASK" + include_primitives = automl_hyperparams["include_primitives"] + exclude_primitives = automl_hyperparams["exclude_primitives"] + new_primitives = automl_hyperparams["new_primitives"] + use_imputer = metadata["missing_values"] + nonnumeric_columns = metadata["nonnumeric_columns"] - logger.debug('Creating a manual grammar') + logger.debug("Creating a manual grammar") grammar = load_manual_grammar( task_name_id, nonnumeric_columns, @@ -91,11 +106,21 @@ def evaluate_pipeline(primitives): metric = scoring._score_func.__name__ config_updated = update_config(task_name, metric, grammar, metadata) - checkpoint_load_folder = checkpoints_folder if contain_checkpoints(checkpoints_folder) else DEFAULT_CHECKPOINT_PATH - checkpoint_save_folder = checkpoints_folder if checkpoints_folder is not None else output_folder + checkpoint_load_folder = ( + checkpoints_folder + if contain_checkpoints(checkpoints_folder) + else DEFAULT_CHECKPOINT_PATH + ) + checkpoint_save_folder = ( + checkpoints_folder + if checkpoints_folder is not None + else DEFAULT_CHECKPOINT_PATH + ) game = PipelineGame(config_updated, evaluate_pipeline) - pipeline_search_rllib(game, time_bound, checkpoint_load_folder, checkpoint_save_folder) - logger.debug('Search completed') + pipeline_search_rllib( + game, time_bound, checkpoint_load_folder, checkpoint_save_folder + ) + logger.debug("Search completed") results = read_result_to_pipeline(builder, output_folder) # queue.put('DONE') @@ -103,33 +128,37 @@ def evaluate_pipeline(primitives): def update_config(task_name, metric, grammar, metadata): - config['PROBLEM'] = task_name - config['DATA_TYPE'] = 'TABULAR' - config['METRIC'] = metric - config['DATASET'] = f'DATASET_{task_name}' - config['GRAMMAR'] = grammar + config["PROBLEM"] = task_name + config["DATA_TYPE"] = "TABULAR" + config["METRIC"] = metric + config["DATASET"] = f"DATASET_{task_name}" + config["GRAMMAR"] = grammar metafeatures = compute_metafeatures(metric, metadata) - config['DATASET_METAFEATURES'] = metafeatures + [0] * (8 - len(metafeatures)) + config["DATASET_METAFEATURES"] = metafeatures + [0] * (8 - len(metafeatures)) return config -def check_repeated_classifiers(pipeline_primitives, all_primitives, ensemble_pipelines_hash): +def check_repeated_classifiers( + pipeline_primitives, all_primitives, ensemble_pipelines_hash +): # Verify if the classifiers are repeated in the ensembles (regardless of the order) classifiers = [] - pipeline_hash = '' + pipeline_hash = "" has_ensemble_primitive = False has_repeated_classifiers = False for primitive_name in pipeline_primitives: - primitive_type = all_primitives[primitive_name]['type'] + primitive_type = all_primitives[primitive_name]["type"] - if primitive_type == 'CLASSIFIER': + if primitive_type == "CLASSIFIER": classifiers.append(primitive_name) - elif primitive_type == 'MULTI_ENSEMBLER': + elif primitive_type == "MULTI_ENSEMBLER": has_ensemble_primitive = True pipeline_hash += primitive_name - if len(classifiers) != len(set(classifiers)): # All classifiers should be different + if len(classifiers) != len( + set(classifiers) + ): # All classifiers should be different has_repeated_classifiers = True else: pipeline_hash += primitive_name @@ -140,7 +169,7 @@ def check_repeated_classifiers(pipeline_primitives, all_primitives, ensemble_pip if has_repeated_classifiers: return True - pipeline_hash += ''.join(sorted(classifiers)) + pipeline_hash += "".join(sorted(classifiers)) if pipeline_hash in ensemble_pipelines_hash: return True @@ -153,42 +182,60 @@ def compute_metafeatures(metric, metadata): metafeatures = [] # SCORING METRIC scoring_type = 0 - if metric in ['accuracy_score', 'f1_score', 'precision_score', 'recall_score', 'jaccard_score']: + if metric in [ + "accuracy_score", + "f1_score", + "precision_score", + "recall_score", + "jaccard_score", + ]: scoring_type = 1 - elif metric in ['max_error', 'mean_absolute_error', 'mean_squared_error', 'mean_squared_log_error', 'median_absolute_error', 'r2_score']: + elif metric in [ + "max_error", + "mean_absolute_error", + "mean_squared_error", + "mean_squared_log_error", + "median_absolute_error", + "r2_score", + ]: scoring_type = 2 - elif metric in ['adjusted_mutual_info_score', 'rand_score', 'mutual_info_score', 'normalized_mutual_info_score']: + elif metric in [ + "adjusted_mutual_info_score", + "rand_score", + "mutual_info_score", + "normalized_mutual_info_score", + ]: scoring_type = 3 metafeatures.append(scoring_type) - + # IMPUTE - metafeatures.append(1 if metadata['missing_values'] else 0) + metafeatures.append(1 if metadata["missing_values"] else 0) # ENCODE - nonnumeric_columns = metadata['nonnumeric_columns'] + nonnumeric_columns = metadata["nonnumeric_columns"] if nonnumeric_columns != {}: metafeatures.append(1) # TEXT metafeatures.append( - len(nonnumeric_columns['TEXT_ENCODER']) - if 'TEXT_ENCODER' in nonnumeric_columns + len(nonnumeric_columns["TEXT_ENCODER"]) + if "TEXT_ENCODER" in nonnumeric_columns else 0 ) # CATEGORICAL metafeatures.append( - len(nonnumeric_columns['CATEGORICAL_ENCODER']) - if 'CATEGORICAL_ENCODER' in nonnumeric_columns + len(nonnumeric_columns["CATEGORICAL_ENCODER"]) + if "CATEGORICAL_ENCODER" in nonnumeric_columns else 0 ) # DATETIME metafeatures.append( - len(nonnumeric_columns['DATETIME_ENCODER']) - if 'DATETIME_ENCODER' in nonnumeric_columns + len(nonnumeric_columns["DATETIME_ENCODER"]) + if "DATETIME_ENCODER" in nonnumeric_columns else 0 ) # IMAGE metafeatures.append( - len(nonnumeric_columns['IMAGE_ENCODER']) - if 'IMAGE_ENCODER' in nonnumeric_columns + len(nonnumeric_columns["IMAGE_ENCODER"]) + if "IMAGE_ENCODER" in nonnumeric_columns else 0 ) else: diff --git a/alpha_automl/utils.py b/alpha_automl/utils.py index d12b35f7..9a30b61f 100644 --- a/alpha_automl/utils.py +++ b/alpha_automl/utils.py @@ -284,16 +284,6 @@ def check_input_for_multiprocessing(start_method, callable_input, input_type): f'from my_external_module import {object_name}"') -def contain_checkpoints(folder_path): - if folder_path is None: - return False - - if False: # Add logic here when the folder contains checkpoint files - return True - - return False - - class SemiSupervisedSplitter: """ SemiSupervisedSplitter makes sure that unlabeled rows not being