diff --git a/dev.requirements.txt b/dev.requirements.txt index 6da7e1b7..1fb1a274 100644 --- a/dev.requirements.txt +++ b/dev.requirements.txt @@ -10,3 +10,4 @@ matplotlib seaborn autopep8 black +debugpy \ No newline at end of file diff --git a/gamechangerml/api/docker-compose.yml b/gamechangerml/api/docker-compose.yml index 3fa68de2..73c9c8e5 100644 --- a/gamechangerml/api/docker-compose.yml +++ b/gamechangerml/api/docker-compose.yml @@ -26,6 +26,7 @@ services: - capabilities: [gpu] ports: - "5000:5000" + - "5678:5678" env_file: - .env volumes: diff --git a/gamechangerml/api/fastapi/mlapp.py b/gamechangerml/api/fastapi/mlapp.py index f5a86784..cbcffd9d 100644 --- a/gamechangerml/api/fastapi/mlapp.py +++ b/gamechangerml/api/fastapi/mlapp.py @@ -1,7 +1,11 @@ -from fastapi import FastAPI import faulthandler +from fastapi import FastAPI from gamechangerml.api.fastapi.routers import startup, search, controls +from gamechangerml.debug.debug_connector import debug_if_flagged + +# start debugger if flagged +debug_if_flagged() # start API app = FastAPI() diff --git a/gamechangerml/api/fastapi/routers/controls.py b/gamechangerml/api/fastapi/routers/controls.py index 5a827b3c..6594bc73 100644 --- a/gamechangerml/api/fastapi/routers/controls.py +++ b/gamechangerml/api/fastapi/routers/controls.py @@ -1030,7 +1030,7 @@ async def get_user_data(data_dict: dict, response: Response): searchData = data_dict["params"]["searchData"] df = pd.DataFrame(searchData) GC_SEARCH_DATA = os.path.join( - DATA_PATH, "user_data", "search_history","SearchPdfMapping.csv" + DATA_PATH, "user_data", "search_history", "SearchPdfMapping.csv" ) df.to_csv(GC_SEARCH_DATA) diff --git a/gamechangerml/api/utils/threaddriver.py b/gamechangerml/api/utils/threaddriver.py index e2103ee7..6cbe1ba7 100644 --- a/gamechangerml/api/utils/threaddriver.py +++ b/gamechangerml/api/utils/threaddriver.py @@ -2,24 +2,32 @@ import json import sys from gamechangerml.api.utils.logger import logger -from gamechangerml.api.utils import processmanager +from gamechangerml.debug.debug_connector import check_debug_flagged # A class that takes in a function and a dictionary of arguments. # The keys in args have to match the parameters in the function. + + class MlThread(threading.Thread): - def __init__(self, function, args = {}): + def __init__(self, function, args={}): super(MlThread, self).__init__() self.function = function self.args = args self.killed = False - + def run(self): try: - sys.settrace(self.globaltrace) + if check_debug_flagged(): + logger.info( + "Debugger from debugpy package is not compatible with sys.settrace, so globaltrace not activated for MlThread") + else: + sys.settrace(self.globaltrace) + self.function(**self.args) except Exception as e: logger.error(e) - logger.info("Thread errored out attempting " + self.function.__name__ + " with parameters: " + json.dumps(self.args)) + logger.info("Thread errored out attempting " + self.function.__name__ + + " with parameters: " + json.dumps(self.args)) def globaltrace(self, frame, why, arg): if why == 'call': @@ -40,16 +48,18 @@ def kill(self): # Pass in a function and args which is an array of dicts # A way to load mulitple jobs and run them on threads. # join is set to false unless we need to collect the results immediately. -def run_threads(function_list, args_list = [], join = False): + + +def run_threads(function_list, args_list=[], join=False): threads = [] for i, function in enumerate(function_list): args = {} if i < len(args_list): args = args_list[i] - thread = MlThread(function, args) + thread = MlThread(function, args) threads.append(thread) thread.start() # If we join the threads the function will wait until they have all completed. if join: for thread in threads: - thread.join() \ No newline at end of file + thread.join() diff --git a/gamechangerml/debug/debug_connector.py b/gamechangerml/debug/debug_connector.py new file mode 100644 index 00000000..86b5a915 --- /dev/null +++ b/gamechangerml/debug/debug_connector.py @@ -0,0 +1,32 @@ +import os +from gamechangerml.api.fastapi.settings import logger + +env_flag = "ENABLE_DEBUGGER" + + +def check_debug_flagged(): + flag_str = os.getenv(env_flag, "false") + return flag_str == 'true' + + +def debug_if_flagged(): + + if check_debug_flagged(): + try: + import debugpy + debugger_port = 5678 + debugpy.listen(('0.0.0.0', debugger_port)) + logger.info(f"\n Debugger listening on {debugger_port} 🥾🦟 \n") + + # debugpy.wait_for_client() + # debugpy.breakpoint() + except Exception as e: + import time + logger.warning("ERROR STARTING DEBUGGER CONNECTION") + time.sleep(3) + logger.warning(e) + time.sleep(3) + logger.info( + f"Debugging can be turned off by removing env variable {env_flag}") + else: + logger.info("ENABLE_DEBUGGER not set, debugger not started") diff --git a/gamechangerml/debug/vscode_debug.README.md b/gamechangerml/debug/vscode_debug.README.md new file mode 100644 index 00000000..aa640546 --- /dev/null +++ b/gamechangerml/debug/vscode_debug.README.md @@ -0,0 +1,63 @@ +# Debug setup for Visual Studio Code + +If you already have a launch.json, merge the below config, otherwise: + +go to `Run and Debug` tab on sidebar + +click link text `create a launch.json file` + +select: `Python` + +select: `Remote Attach` + +Enter the host name... : `localhost` + +Enter the port number... : `5678` + +your launch.json should look like below +``` +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Remote Attach", + "type": "python", + "request": "attach", + "connect": { + "host": "localhost", + "port": 5678 + }, + "pathMappings": [ + { + "localRoot": "${workspaceFolder}", + "remoteRoot": "." + } + ], + "justMyCode": true + } + ] +} +``` + +## How it's working + +When you use + +`docker-compose up` - `gamechanger-ml/gamechangerml/api/docker-compose.yml` + +This exposes port `5678` for `gamechanger-ml-gpu` + +
+ +In the entrypoint `gamechanger-ml/gamechangerml/api/fastapi/mlapp.py` + +`debug_if_flagged()` is called immediately, which is from `gamechanger-ml/gamechangerml/debug/debug_connector.py` + +This starts up `debugpy` to listen on `5678` if the ENV variable `ENABLE_DEBUGGER` in `setup_env.sh` is set to `true` + +The vscode debugger will attach to it using `launch.json` config + +Now you're ready to crush bugs 🥾🦟 \ No newline at end of file diff --git a/gamechangerml/scripts/update_eval_data.py b/gamechangerml/scripts/update_eval_data.py index 895e88a8..bccb70fe 100644 --- a/gamechangerml/scripts/update_eval_data.py +++ b/gamechangerml/scripts/update_eval_data.py @@ -5,8 +5,8 @@ from gamechangerml.src.model_testing.validation_data import IntelSearchData from gamechangerml.configs.config import ValidationConfig from gamechangerml.src.utilities.test_utils import ( - make_timestamp_directory, check_directory, CustomJSONizer - ) + make_timestamp_directory, check_directory, NumpyJSONEncoder +) from gamechangerml import DATA_PATH from gamechangerml.api.utils.pathselect import get_model_paths import logging @@ -17,27 +17,29 @@ def make_tiered_eval_data(index_path, testing_only): - + if not index_path: index_path = SENT_INDEX if not os.path.exists(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")): - os.mkdir(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")) - - sub_dir = os.path.join(DATA_PATH, "validation", "domain", "sent_transformer") - + os.mkdir(os.path.join(DATA_PATH, "validation", + "domain", "sent_transformer")) + + sub_dir = os.path.join(DATA_PATH, "validation", + "domain", "sent_transformer") + save_dir = make_timestamp_directory(sub_dir) def save_data( - level: str, - min_correct_matches: int, - max_results: int, - start_date: str, - end_date: str, - exclude_searches: List[str], - filter_queries: bool, - testing_only: bool, - save_dir: Union[str,os.PathLike]=save_dir) -> Tuple[Dict[str,str], Dict[str,str], Dict[str,str]]: + level: str, + min_correct_matches: int, + max_results: int, + start_date: str, + end_date: str, + exclude_searches: List[str], + filter_queries: bool, + testing_only: bool, + save_dir: Union[str, os.PathLike] = save_dir) -> Tuple[Dict[str, str], Dict[str, str], Dict[str, str]]: """Makes eval data for each tier level using args from config.py and saves to save_dir Args: level [str]: tier level (['any', 'silver', 'gold']) @@ -54,19 +56,19 @@ def save_data( max_res = max_results[level] intel = IntelSearchData( - start_date=start_date, - end_date=end_date, - exclude_searches=exclude_searches, - min_correct_matches=min_matches, - max_results=max_res, - filter_queries=filter_queries, - index_path=index_path, - testing_only=testing_only - ) + start_date=start_date, + end_date=end_date, + exclude_searches=exclude_searches, + min_correct_matches=min_matches, + max_results=max_res, + filter_queries=filter_queries, + index_path=index_path, + testing_only=testing_only + ) save_intel = { - "queries": intel.queries, - "collection": intel.collection, + "queries": intel.queries, + "collection": intel.collection, "meta_relations": intel.all_relations, "correct": intel.correct, "incorrect": intel.incorrect, @@ -89,45 +91,48 @@ def save_data( "filter_queries": str(filter_queries) } - save_intel = json.dumps(save_intel, cls=CustomJSONizer) + save_intel = json.dumps(save_intel, cls=NumpyJSONEncoder) intel_path = check_directory(os.path.join(save_dir, level)) intel_file = os.path.join(intel_path, 'intelligent_search_data.json') - metafile = os.path.join(intel_path, 'intelligent_search_metadata.json') + metafile = os.path.join(intel_path, 'intelligent_search_metadata.json') with open(intel_file, "w") as outfile: json.dump(save_intel, outfile) - + with open(metafile, "w") as outfile: json.dump(metadata, outfile) - logger.info(f"***Saved intelligent search validation data to: {intel_path}") + logger.info( + f"***Saved intelligent search validation data to: {intel_path}") return metadata all_data = save_data( level='any', - filter_queries = False, - testing_only = testing_only, + filter_queries=False, + testing_only=testing_only, **ValidationConfig.TRAINING_ARGS - ) - + ) + silver_data = save_data( level='silver', - filter_queries = False, + filter_queries=False, testing_only=testing_only, **ValidationConfig.TRAINING_ARGS - ) - + ) + gold_data = save_data( level='gold', - filter_queries = False, # should use same (updated) exclude list of queries as silver_data + # should use same (updated) exclude list of queries as silver_data + filter_queries=False, testing_only=testing_only, **ValidationConfig.TRAINING_ARGS - ) - + ) + return all_data, silver_data, gold_data + if __name__ == '__main__': - + try: make_tiered_eval_data(index_path=None, testing_only=False) except Exception as e: - logger.warning(e, exc_info=True) \ No newline at end of file + logger.warning(e, exc_info=True) diff --git a/gamechangerml/setup_env.sh b/gamechangerml/setup_env.sh index d62b160d..93e0ae74 100755 --- a/gamechangerml/setup_env.sh +++ b/gamechangerml/setup_env.sh @@ -76,7 +76,7 @@ function setup_dev() { export GC_ENABLE_SSL="${GC_ENABLE_SSL:-false}" export ML_WEB_TOKEN="${ML_WEB_TOKEN:-eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJwZXJtcyI6WyJHYW1lY2hhbmdlciBBZG1pbiJdLCJjbiI6IjAwNyIsImNzcmYtdG9rZW4iOiI0ZWE1YzUwOGE2NTY2ZTc2MjQwNTQzZjhmZWIwNmZkNDU3Nzc3YmUzOTU0OWM0MDE2NDM2YWZkYTY1ZDIzMzBlIiwiaWF0IjoxNjQ0MzQyMjI0fQ.ezxT-36a25IMFJPea3re7sUwYZfm8ivZvDVAv_zti1W6DRkM2Hs7WwFuy-gR092m-p7Z7ohb7yM2AKVbJAn4CMtI3_1j0_VzzYb6yhQj7YIPg-Cax5Em9JCrIlCEx7r26o-zV1me0mIYMATJTMInKikeBvg2RJErvLwkgZNQVT8gQyR-JxM4mhjmylcel9-kt5FpJJuUKnzPI8BqVeG_eL6ktevA9odJRx56w9n2LivUaoQUCiXreLOLmSEwkkhIFnsyMcCCwkPpx4lMrkzjIr3B08_gRk5wIv4pV01OcSYR4QkXM7ZsNZZzRf-JtPHYn9SlT9DvwRVbYniYUCA7IM0OegFKSTt_-i7qvuKuYFHGDStfkijX2T6g_49oY1qfLsKldisccoOLfsaROpB1NiE9DBeM5OzAo-R98H_UiUFjsFVNvlunETbhuqX2yZFUjKxxerS_-1_DW8BmoD25Ofl188KM8gqUXo5lJs4bPTf41_N_V-57muZxdAq8kBazDKhaudAzskFNFF1B9dxwgxeE8wd5Gh_beCuCoP3K-9GwRVFfrdOCO19FDjqpLr0k94UfZzuflP5SDGXth2-AzZIslurPDL_1F4iadxq06GJggwryMxC7_Uc4_FQST53-gl9SXEFVSNdr6gsw318JNiyz8bxbBpIj7posqQeEaDg}" - + export ENABLE_DEBUGGER="${ENABLE_DEBUGGER:-true}" } @@ -97,6 +97,7 @@ function setup_devlocal() { export ES_ENABLE_AUTH="${ES_ENABLE_AUTH:-false}" export DEV_ENV="DEVLOCAL" + export ENABLE_DEBUGGER="${ENABLE_DEBUGGER:-true}" } function setup_k8s_dev() { diff --git a/gamechangerml/src/model_testing/validation_data.py b/gamechangerml/src/model_testing/validation_data.py index f3026061..d205587f 100644 --- a/gamechangerml/src/model_testing/validation_data.py +++ b/gamechangerml/src/model_testing/validation_data.py @@ -419,10 +419,10 @@ def split_feedback(self): if self.start_date or self.end_date: df = filter_date_range(df, self.start_date, self.end_date) # drop all rows where is no search - df.dropna(subset=['search'], inplace=True) + df.dropna(subset=['value'], inplace=True) # drop duplicates df.drop_duplicates( - subset=['idvisit', 'document', 'search'], inplace=True) + subset=['idvisit', 'document', 'value'], inplace=True) df['source'] = 'user_history' def clean_quot(string): @@ -442,7 +442,7 @@ def is_question(string): else: return bool(set(string.lower().split()).intersection(question_words)) - df.rename(columns={'documenttime': 'date', 'search': 'search_text', + df.rename(columns={'documenttime': 'date', 'value': 'search_text', 'document': 'title_returned'}, inplace=True) df['search_text'] = df['search_text'].apply(lambda x: clean_quot(x)) df['search_text_clean'] = df['search_text'].apply( @@ -608,11 +608,13 @@ def map_values(queries, collection, relations): doc_keys = relations[key] docs = [collection[i] for i in doc_keys] vals_dict[query] = docs - + return vals_dict - correct_vals = map_values(intel_search_queries, intel_search_results, correct) - incorrect_vals = map_values(intel_search_queries, intel_search_results, incorrect) + correct_vals = map_values( + intel_search_queries, intel_search_results, correct) + incorrect_vals = map_values( + intel_search_queries, intel_search_results, incorrect) def sort_dictionary(dictionary): @@ -623,7 +625,7 @@ def sort_dictionary(dictionary): vals.sort() mydict_new[key] = vals return mydict_new - + correct_vals = sort_dictionary(correct_vals) incorrect_vals = sort_dictionary(incorrect_vals) diff --git a/gamechangerml/src/search/sent_transformer/finetune.py b/gamechangerml/src/search/sent_transformer/finetune.py index 8c3123c0..020cebef 100644 --- a/gamechangerml/src/search/sent_transformer/finetune.py +++ b/gamechangerml/src/search/sent_transformer/finetune.py @@ -48,6 +48,7 @@ def fix_model_config(model_load_path): except: logger.info("Could not update model config file") + def format_inputs(train, test, data_dir): """Create input data for dataloader and df with train/test split data""" @@ -71,13 +72,15 @@ def format_inputs(train, test, data_dir): processmanager.update_status(processmanager.loading_data, count, total) df = pd.DataFrame(all_data, columns=["key", "doc", "score", "label"]) - df.drop_duplicates(subset = ['doc', 'score', 'label'], inplace = True) + df.drop_duplicates(subset=['doc', 'score', 'label'], inplace=True) logger.info(f"Generated training data CSV of {str(df.shape[0])} rows") - df_path = os.path.join(data_dir, timestamp_filename("finetuning_data", ".csv")) + df_path = os.path.join( + data_dir, timestamp_filename("finetuning_data", ".csv")) df.to_csv(df_path) return train_samples, df_path + class STFinetuner(): def __init__(self, model_load_path, model_save_path, shuffle, batch_size, epochs, warmup_steps): @@ -90,8 +93,9 @@ def __init__(self, model_load_path, model_save_path, shuffle, batch_size, epochs self.batch_size = batch_size self.epochs = epochs self.warmup_steps = warmup_steps - self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - + self.device = torch.device( + "cuda" if torch.cuda.is_available() else "cpu") + def retrain(self, data_dir, testing_only, version): try: @@ -102,15 +106,28 @@ def retrain(self, data_dir, testing_only, version): if testing_only: logger.info( "Creating smaller dataset just for testing finetuning.") - train_queries = list(set([train[i]['query'] for i in train.keys()]))[:30] - test_queries = list(set([test[i]['query'] for i in test.keys()]))[:10] - train = {k: train[k] for k in train.keys() if train[k]['query'] in train_queries} - test = {k: test[k] for k in test.keys() if test[k]['query'] in test_queries} - + train_queries = list( + set([ + train[i]['query'] for i in train.keys() + ]))[:30] + test_queries = list( + set([ + test[i]['query'] for i in test.keys() + ]))[:10] + train = { + k: train[k] for k in train.keys() + if train[k]['query'] in train_queries + } + test = { + k: test[k] for k in test.keys() + if test[k]['query'] in test_queries + } + del data gc.collect() - processmanager.update_status(processmanager.training, 0, 1,thread_id=threading.current_thread().ident) + processmanager.update_status( + processmanager.training, 0, 1, thread_id=threading.current_thread().ident) sleep(0.1) # make formatted training data train_samples, df_path = format_inputs(train, test, data_dir) @@ -122,11 +139,12 @@ def retrain(self, data_dir, testing_only, version): train_samples, shuffle=self.shuffle, batch_size=self.batch_size) train_loss = losses.CosineSimilarityLoss(model=self.model) #del train_samples - #gc.collect() + # gc.collect() logger.info("Finetuning the encoder model...") self.model.fit(train_objectives=[ (train_dataloader, train_loss)], epochs=self.epochs, warmup_steps=self.warmup_steps) - processmanager.update_status(processmanager.training, 1, 0,thread_id=threading.current_thread().ident) + processmanager.update_status( + processmanager.training, 1, 0, thread_id=threading.current_thread().ident) logger.info("Finished finetuning the encoder model") # save model self.model.save(self.model_save_path) @@ -150,8 +168,9 @@ def retrain(self, data_dir, testing_only, version): } save_json("metadata.json", self.model_save_path, metadata) - logger.info(f"Finetuned model metadata saved to {self.model_save_path}/metadata.json") - + logger.info( + f"Finetuned model metadata saved to {self.model_save_path}/metadata.json") + # when not testing only, save to S3 if not testing_only: logger.info("Saving data to S3...") @@ -166,7 +185,8 @@ def retrain(self, data_dir, testing_only, version): logger.info("Saving model to S3...") dst_path = self.model_save_path + ".tar.gz" - utils.create_tgz_from_dir(src_dir=self.model_save_path, dst_archive=dst_path) + utils.create_tgz_from_dir( + src_dir=self.model_save_path, dst_archive=dst_path) model_id = self.model_save_path.split('_')[1] logger.info(f"*** Created tgz file and saved to {dst_path}") @@ -178,5 +198,6 @@ def retrain(self, data_dir, testing_only, version): except Exception as e: logger.warning("Could not complete finetuning") logger.error(e) - + raise e + return diff --git a/gamechangerml/src/utilities/test_utils.py b/gamechangerml/src/utilities/test_utils.py index a1f01d4d..a9cd9f5e 100644 --- a/gamechangerml/src/utilities/test_utils.py +++ b/gamechangerml/src/utilities/test_utils.py @@ -24,6 +24,7 @@ class TimeoutException(Exception): # Custom exception class pass + def init_timer(): '''Creates a timer using signal''' # https://stackoverflow.com/questions/25027122/break-the-function-after-certain-time/25027182 @@ -34,33 +35,39 @@ def timeout_handler(signum, frame): # Custom signal handler return + def check_file_size(filename, path): '''Returns the filesize (in bytes) of a file''' return os.path.getsize(os.path.join(path, filename)) # from create_embeddings.py + + def get_user(logger): '''Gets user or sets value to 'unknown' (from create_embeddings.py)''' try: user = os.environ.get("GC_USER", default="root") - if (user =="root"): + if (user == "root"): user = str(os.getlogin()) except Exception as e: user = "unknown" logger.info("Could not get system user") logger.info(e) + def save_json(filename, path, data): '''Saved a json file''' filepath = os.path.join(path, filename) - with open(filepath, "w") as outfile: - return json.dump(data, outfile) + with open(filepath, "w") as outfile: + return json.dump(data, outfile, cls=NumpyJSONEncoder) + def open_json(filename, path): '''Opens a json file''' with open(os.path.join(path, filename)) as f: return json.load(f) + def open_jsonl(filename, path): '''Opens a jsonl file''' with open(os.path.join(path, filename), 'r') as json_file: @@ -70,25 +77,29 @@ def open_jsonl(filename, path): for json_str in json_list: result = json.loads(json_str) data.append(result) - + return data + def open_txt(filepath): '''Opens a txt file''' with open(filepath, "r") as fp: return fp.readlines() + def get_index_size(sent_index_path): '''Checks the size of a sentence index by # of doc ids.''' doc_ids = open_txt(os.path.join(sent_index_path, 'doc_ids.txt')) return len(doc_ids) + def timestamp_filename(filename, extension): '''Makes a filename that include a %Y-%m-%d timestamp''' today = date.today() formatted = '_'.join([filename, today.strftime("%Y%m%d")]) return formatted + extension + def check_directory(directory): '''Checks if a directory exists, if it does not makes the directory''' if not os.path.exists(directory): @@ -97,6 +108,7 @@ def check_directory(directory): return directory + def make_timestamp_directory(base_dir): now = datetime.now() @@ -106,15 +118,40 @@ def make_timestamp_directory(base_dir): os.makedirs(new_dir) else: logger.info("Directory {} already exists.".format(new_dir)) - + return new_dir # stackoverflow -class CustomJSONizer(json.JSONEncoder): +# https://stackoverflow.com/questions/50916422/python-typeerror-object-of-type-int64-is-not-json-serializable + + +class NumpyJSONEncoder(json.JSONEncoder): + """ Custom encoder for numpy data types """ + def default(self, obj): - return super().encode(bool(obj)) \ - if isinstance(obj, np.bool_) \ - else super().default(obj) + if isinstance(obj, (np.int_, np.intc, np.intp, np.int8, + np.int16, np.int32, np.int64, np.uint8, + np.uint16, np.uint32, np.uint64)): + + return int(obj) + + elif isinstance(obj, (np.float_, np.float16, np.float32, np.float64)): + return float(obj) + + elif isinstance(obj, (np.complex_, np.complex64, np.complex128)): + return {'real': obj.real, 'imag': obj.imag} + + elif isinstance(obj, (np.ndarray,)): + return obj.tolist() + + elif isinstance(obj, (np.bool_)): + return bool(obj) + + elif isinstance(obj, (np.void)): + return None + + return json.JSONEncoder.default(self, obj) + def clean_nans(value): '''Replaces null value with 0''' @@ -123,22 +160,28 @@ def clean_nans(value): else: return value -## Evaluation utility functions +# Evaluation utility functions + def get_most_recent_eval(directory): '''Gets the most recent eval json from a directory''' - files = [f for f in os.listdir(directory) if os.path.isfile(os.path.join(directory, f))] - evals = [f for f in files if f.split('.')[-1]=='json'] + files = [f for f in os.listdir(directory) if os.path.isfile( + os.path.join(directory, f))] + evals = [f for f in files if f.split('.')[-1] == 'json'] if evals: - evals.sort(key=lambda x:int(x.split('_')[-1].split('.')[0].replace('-', ''))) + evals.sort(key=lambda x: int( + x.split('_')[-1].split('.')[0].replace('-', ''))) return evals[-1] else: return '' + def collect_evals(directory): '''Checks if a model directory has any evaluations''' - sub_dirs = [d for d in os.listdir(directory) if os.path.isdir(os.path.join(directory, d))] - eval_dirs = [os.path.join(directory, d) for d in sub_dirs if d.split('_')[0]=='evals'] + sub_dirs = [d for d in os.listdir( + directory) if os.path.isdir(os.path.join(directory, d))] + eval_dirs = [os.path.join(directory, d) + for d in sub_dirs if d.split('_')[0] == 'evals'] if not eval_dirs: return {} else: @@ -152,6 +195,7 @@ def collect_evals(directory): evaldict[name] = {} return evaldict + def collect_sent_evals_gc(index_path): '''gets evals for index''' eval_dict = {} @@ -166,19 +210,22 @@ def collect_sent_evals_gc(index_path): subdict[level] = open_json(file, fullpath) else: subdict[level] = '' - + eval_dict["gc"] = subdict return eval_dict + def handle_sent_evals(index_path): try: return collect_sent_evals_gc(index_path) except Exception as e: logger.warning(e) return collect_evals(index_path) - + # from sentence_transformers==2.0.0 -#https://github.com/UKPLab/sentence-transformers/blob/master/sentence_transformers/util.py +# https://github.com/UKPLab/sentence-transformers/blob/master/sentence_transformers/util.py + + def cos_sim(a, b): """ Computes the cosine similarity cos_sim(a[i], b[j]) for all i and j. @@ -200,19 +247,20 @@ def cos_sim(a, b): b_norm = torch.nn.functional.normalize(b, p=2, dim=1) return torch.mm(a_norm, b_norm.transpose(0, 1)) + def update_dictionary(old_dict, new_additions, prefix): '''Update master dictionary of unique queries''' def make_ids(new_additions, last_count, prefix): '''Make UUIDs for new queries/docs''' - + new_dict = {} for i in new_additions: if i not in old_dict.values(): last_count += 1 myid = str(last_count) - add = str(0) * ( 7 - len(myid)) - myid = prefix + add + myid + add = str(0) * (7 - len(myid)) + myid = prefix + add + myid new_dict[myid] = i return new_dict @@ -222,9 +270,10 @@ def make_ids(new_additions, last_count, prefix): else: last_count = -1 new_dict = make_ids(new_additions, last_count, prefix) - + return {**old_dict, **new_dict} + def map_ids(iddict, df, mapcol, idcol): '''Map IDs back to df''' @@ -234,15 +283,17 @@ def map_ids(iddict, df, mapcol, idcol): return df + def update_meta_relations(metadata, df, query_col, return_col): '''Update dict with relations and metadata about each match''' - - df = df.sort_values(by = ['date'], ascending = False).sort_values(by = ['ID_key']) + + df = df.sort_values( + by=['date'], ascending=False).sort_values(by=['ID_key']) for x in df['ID_key'].unique(): - subset = df[df['ID_key']==x].copy() + subset = df[df['ID_key'] == x].copy() for i in subset['ID_value'].unique(): - subsubset = subset[subset['ID_value']==i] + subsubset = subset[subset['ID_value'] == i] exact_matches = [] for k in subsubset.index: em = {} @@ -251,7 +302,7 @@ def update_meta_relations(metadata, df, query_col, return_col): em['source'] = subsubset.loc[k, 'source'] em['date'] = subsubset.loc[k, 'date'] exact_matches.append(em) - + if x in metadata.keys() and i in metadata[x]: metadata[x][i]['exact_matches'].extend(exact_matches) else: @@ -259,40 +310,47 @@ def update_meta_relations(metadata, df, query_col, return_col): matchdict['correct_match'] = subset['correct_match'].all() matchdict['last_match_date'] = list(subset['date'])[0] matchdict['exact_matches'] = exact_matches - + if x in metadata.keys(): metadata[x][i] = matchdict else: searchdict = {} searchdict[i] = matchdict metadata[x] = searchdict - - metadata[x][i]['times_matched'] = len(metadata[x][i]['exact_matches']) - + + metadata[x][i]['times_matched'] = len( + metadata[x][i]['exact_matches']) + return metadata - + + def filter_rels(metadata, min_correct_matches, max_results): '''Filter relations by criteria''' - + correct_rels = {} incorrect_rels = {} - logger.info(f"Generating data for {str(len(metadata))} queries with {str(max_results)} max results and {str(min_correct_matches)} min correct matches") + logger.info( + f"Generating data for {str(len(metadata))} queries with {str(max_results)} max results and {str(min_correct_matches)} min correct matches") for key in metadata: acceptable_positive_results = [] negative_results = [] - if max_results and len(metadata[key]) > max_results: # if we have more than n max results, skip this match - logger.info(f"Skipping {key}: has {str(len(metadata[key]))} unique matches") + # if we have more than n max results, skip this match + if max_results and len(metadata[key]) > max_results: + logger.info( + f"Skipping {key}: has {str(len(metadata[key]))} unique matches") continue for match in metadata[key]: result = metadata[key][match] sources = [i['source'] for i in result['exact_matches']] if result['correct_match'] == True: - if 'matamo' in sources: # we trust matamo data + if 'matamo' in sources: # we trust matamo data acceptable_positive_results.append(match) - elif result['times_matched'] >= min_correct_matches: # only pull history matches occurring more than x times + # only pull history matches occurring more than x times + elif result['times_matched'] >= min_correct_matches: acceptable_positive_results.append(match) else: - logger.info(f"Skipping {key}, {match}: matched {str(result['times_matched'])} times") + logger.info( + f"Skipping {key}, {match}: matched {str(result['times_matched'])} times") elif result['correct_match'] == False: negative_results.append(match) @@ -303,33 +361,39 @@ def filter_rels(metadata, min_correct_matches, max_results): logger.info(f"Generated {str(len(correct_rels))} correct queries") logger.info(f"Generated {str(len(incorrect_rels))} incorrect queries") - + return correct_rels, incorrect_rels + def convert_timestamp_to_datetime(timestamp): return pd.to_datetime(parser.parse(timestamp).strftime("%Y-%m-%d")) -## filter users and dates when csv read in +# filter users and dates when csv read in + + def filter_date_range(df, start_date, end_date): if 'createdAt' in df.columns: timecol = 'createdAt' elif 'searchtime' in df.columns: timecol = 'searchtime' df['dt'] = df[timecol].apply(lambda x: convert_timestamp_to_datetime(x)) - logger.info(f"Available date range: {str(min(df['dt']))} - {str(max(df['dt']))}") + logger.info( + f"Available date range: {str(min(df['dt']))} - {str(max(df['dt']))}") subset = df.copy() if start_date: subset = subset[subset['dt'] >= pd.to_datetime(start_date)] if end_date: subset = subset[subset['dt'] <= pd.to_datetime(end_date)] - logger.info(f"New date range: {str(min(subset['dt']))} - {str(max(subset['dt']))}") + logger.info( + f"New date range: {str(min(subset['dt']))} - {str(max(subset['dt']))}") return subset + def concat_csvs(directory): '''Combines csvs in directory into one df; drops entirely null columns''' df = pd.DataFrame() logger.info(str(directory)) - csvs = [i for i in os.listdir(directory) if i.split('.')[-1]=='csv'] + csvs = [i for i in os.listdir(directory) if i.split('.')[-1] == 'csv'] csvs = [i for i in csvs if i[:2] != '._'] logger.info(f"Combining csvs: {str(csvs)}") for i in csvs: @@ -341,46 +405,57 @@ def concat_csvs(directory): pass return df + def concat_matamo(testing_only=False): if testing_only: return pd.read_csv(MATAMO_TEST_FILE) else: return concat_csvs(MATAMO_DIR) + def concat_search_hist(testing_only=False): if testing_only: return pd.read_csv(SEARCH_TEST_FILE) else: return concat_csvs(SEARCH_HIST) + def get_most_recent_dir(parent_dir): - - subdirs = [os.path.join(parent_dir, d) for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d))] + + subdirs = [os.path.join(parent_dir, d) for d in os.listdir( + parent_dir) if os.path.isdir(os.path.join(parent_dir, d))] if len(subdirs) > 0: return max(subdirs, key=os.path.getctime) else: - logger.error("There are no subdirectories to retrieve most recent data from") + logger.error( + "There are no subdirectories to retrieve most recent data from") return None + def make_test_corpus( - corpus_dir, # main corpus dir - save_dir, # where to save the test corpus - percent_random, # float from 0-1 percentage of index to make from random docs - max_size=1000, # max size of the index (to save on time building) - include_ids=None, # if any IDs need to be in the test, pass as list - max_file_size=100000 # max size of random files to add to the test corpus - ): + corpus_dir, # main corpus dir + save_dir, # where to save the test corpus + percent_random, # float from 0-1 percentage of index to make from random docs + max_size=1000, # max size of the index (to save on time building) + include_ids=None, # if any IDs need to be in the test, pass as list + max_file_size=100000 # max size of random files to add to the test corpus +): '''Makes a small test corpus for checking validation''' - all_files = [f.split('.json')[0] + '.json' for f in os.listdir(corpus_dir) if os.path.isfile(os.path.join(corpus_dir, f))] + all_files = [f.split('.json')[0] + '.json' for f in os.listdir(corpus_dir) + if os.path.isfile(os.path.join(corpus_dir, f))] if percent_random > 1: percent_random = percent_random / 100 if include_ids: logger.info(f"{str(len(include_ids))} ids required in test corpus") - include_ids = [f.split('.json')[0] + '.json' for f in include_ids] # make sure json at end of filenames - subset = list(set(all_files).intersection(include_ids)) # only get ids in the main corpus + # make sure json at end of filenames + include_ids = [f.split('.json')[0] + '.json' for f in include_ids] + # only get ids in the main corpus + subset = list(set(all_files).intersection(include_ids)) if len(subset) < len(include_ids): - logger.info(f"Did not find all required ids in the main corpus dir.") - logger.info(f"Found {str(len(subset))} / {str(len(include_ids))} ids") + logger.info( + f"Did not find all required ids in the main corpus dir.") + logger.info( + f"Found {str(len(subset))} / {str(len(include_ids))} ids") other = [i for i in all_files if i not in include_ids] if percent_random > 0: num_add = round(len(subset)/percent_random - len(subset)) @@ -390,16 +465,17 @@ def make_test_corpus( subset = [] other = all_files num_add = max_size - - ## add random docs + + # add random docs for i in range(num_add): filesize = 1000000 - while filesize > max_file_size: # as we iterate, skip large files - random_index = random.randint(0,len(other)-1) - file = other[random_index] # pick a random file - filesize = check_file_size(file, corpus_dir) # if filesize is smaller than max, break loop + while filesize > max_file_size: # as we iterate, skip large files + random_index = random.randint(0, len(other)-1) + file = other[random_index] # pick a random file + # if filesize is smaller than max, break loop + filesize = check_file_size(file, corpus_dir) subset.append(file) - subset = list(set(subset)) # remove duplicates + subset = list(set(subset)) # remove duplicates logger.info(f"Collected {str(len(subset))} jsons") return subset diff --git a/gamechangerml/train/pipeline.py b/gamechangerml/train/pipeline.py index 7f0e8b50..98cca14e 100644 --- a/gamechangerml/train/pipeline.py +++ b/gamechangerml/train/pipeline.py @@ -319,7 +319,7 @@ def finetune_sent( logger.warning("Could not finetune sentence model - pipeline") logger.error(e) - return {} + raise e def evaluate( self, diff --git a/requirements.txt b/requirements.txt index 56037418..8e3ae8ae 100644 --- a/requirements.txt +++ b/requirements.txt @@ -39,3 +39,4 @@ uvicorn~=0.13.4 uvloop~=0.14.0 wikipedia~=1.4.0 xgboost==1.1.0 +debugpy \ No newline at end of file