diff --git a/gamechangerml/api/fastapi/routers/controls.py b/gamechangerml/api/fastapi/routers/controls.py index 8d3b05dd..858235ab 100644 --- a/gamechangerml/api/fastapi/routers/controls.py +++ b/gamechangerml/api/fastapi/routers/controls.py @@ -669,11 +669,17 @@ def finetune_sentence(model_dict): testing_only = model_dict["testing_only"] except: testing_only = False + try: + remake_train_data = model_dict["remake_train_data"] + except: + remake_train_data = False args = { "batch_size": 8, "epochs": int(model_dict["epochs"]), "warmup_steps": int(model_dict["warmup_steps"]), "testing_only": bool(testing_only), + "remake_train_data": bool(remake_train_data), + "retriever": MODELS.sentence_searcher, } pipeline.run( build_type="sent_finetune", diff --git a/gamechangerml/data/.gitignore b/gamechangerml/data/.gitignore new file mode 100644 index 00000000..97a4d937 --- /dev/null +++ b/gamechangerml/data/.gitignore @@ -0,0 +1 @@ +corpus/* \ No newline at end of file diff --git a/gamechangerml/scripts/make_training_data.py b/gamechangerml/scripts/make_training_data.py index d96d8635..71ea05ae 100644 --- a/gamechangerml/scripts/make_training_data.py +++ b/gamechangerml/scripts/make_training_data.py @@ -1,5 +1,4 @@ import random -import torch import pandas as pd import os import json @@ -15,6 +14,8 @@ from gamechangerml.api.utils.logger import logger from gamechangerml.api.utils.pathselect import get_model_paths from gamechangerml.scripts.update_eval_data import make_tiered_eval_data +from gamechangerml.src.text_handling.corpus import LocalCorpus +from gensim.utils import simple_preprocess from gamechangerml import DATA_PATH model_path_dict = get_model_paths() @@ -46,14 +47,13 @@ def get_sample_paragraphs(pars, par_limit=100, min_length=150): return collected_pars -def get_best_paragraphs(data: pd.DataFrame, query: str, doc_id: str, nlp, min_score: float=0.60) -> List[Dict[str,str]]: +def get_best_paragraphs(query: str, doc_id: str, nlp, n_returns, min_score: float=0.60) -> List[Dict[str,str]]: """Retrieves the best paragraphs for expected doc using similarity model Args: data [pd.DataFrame]: data df with processed text at paragraph_id level for sent_index query [str]: query doc_id [str]: doc_id of the expected document to show up with the query - sim: SimilarityRanker class - n_matching [int]: number of matching paragraphs to retrieve for the expected doc + nlp: spacy nlp model for similarity reranking Returns: [List[Dict[str,str]]]: List of dictionaries of paragraph matches """ @@ -65,44 +65,49 @@ def get_best_paragraphs(data: pd.DataFrame, query: str, doc_id: str, nlp, min_sc json = open_json(doc_id + '.json', CORPUS_DIR) paragraphs = json['paragraphs'] - sents = get_sample_paragraphs(paragraphs)[:50] # get top 50 paragraphs + sents = get_sample_paragraphs(paragraphs)[:n_returns] # get top n_returns for sent in sents: - short = ' '.join(sent['text'].split(' ')[:400]) # shorten paragraphs - pars.append(short) + processed = ' '.join(simple_preprocess(sent['text'], min_len=2, max_len=100)) + pars.append({"id": sent["id"], "text": processed}) ranked = [] try: - if len(sents) == 0: + if len(pars) == 0: logger.info("---No paragraphs retrieved for this expected doc") - elif len(sents) == 1: - ranked = [{"score": 'na', "id": sents[0]['id'], "text": sents[0]['text']}] + elif len(pars) == 1: + ranked = [{"score": 'na', "id": pars[0]['id'], "text": pars[0]['text']}] else: comparisons = [] - for sent in sents: - doc2 = nlp(sent['text']) + for par in pars: + doc2 = nlp(par['text']) sim = doc1.similarity(doc2) if sim >= min_score: - record = {"score": sim, "id": sent['id'], "text": sent['text']} + record = {"score": sim, "id": par['id'], "text": par['text']} comparisons.append(record) else: pass ranked = sorted(comparisons, key = lambda z: z['score'], reverse=True) - logger.info(f"*** Collected {str(len(ranked))} / {str(len(sents))} paragraphs (passing sim threshold) retrieved for {doc_id}") + logger.info(f"*** Collected {str(len(ranked))} / {str(len(pars))} paragraphs (passing sim threshold) retrieved for {doc_id}") except Exception as e: logger.info(f"---Could not re-rank the paragraphs for {query}") logger.warning(e) + + # if no paragraphs are returned, get the title + if len(ranked)==0: + clean_title = ' '.join(simple_preprocess(json['title'], min_len=2, max_len=100)) + ranked.append({"score": 1, "id": doc_id + ".pdf_0", "text": clean_title}) return ranked -def check_no_match(expected_id: str, par_id: str) -> bool: +def check_no_match(id_1: str, id_2: str) -> bool: """Checks if paragraph ID matches the expected doc ID""" - if par_id.split('.pdf')[0].upper().strip().lstrip() == expected_id.upper().strip().lstrip(): + if id_1.split('.pdf')[0].upper().strip().lstrip() == id_2.split('.pdf')[0].upper().strip().lstrip(): return False else: return True def get_negative_paragraphs( - data: pd.DataFrame, query: str, doc_id: str, retriever, n_returns: int) -> List[Dict[str,str]]: + data: pd.DataFrame, query: str, doc_id: str, retriever, n_returns: int, any_matches: Dict[str,str]) -> List[Dict[str,str]]: """Looks up negative (not matching) paragraphs for each query Args: data [pd.DataFrame]: data df with processed text at paragraph_id level for sent_index @@ -116,17 +121,25 @@ def get_negative_paragraphs( """ checked_results = [] + try: + single_matching_docs = [i for i in any_matches[query] if check_no_match(i, doc_id)] + except: + single_matching_docs = [] try: results = retriever.retrieve_topn(query, n_returns) logger.info(f"Retrieved {str(len(results))} negative samples for query: {query} / doc: {doc_id}") for result in results: par = data[data["paragraph_id"]==result['id']].iloc[0]["text"] par = ' '.join(par.split(' ')[:400]) - if check_no_match(doc_id, result['id']): - checked_results.append({"query": query, "doc": result['id'], "paragraph": par, "label": 0}) + if check_no_match(doc_id, result['id']): + for s in single_matching_docs: + if s and check_no_match(s, result['id']): + checked_results.append({"query": query, "doc": result['id'], "paragraph": par, "label": 0}) + else: + checked_results.append({"query": query, "doc": result['id'], "paragraph": par, "label": 0.5}) except Exception as e: logger.warning("Could not get negative paragraphs") - logger.warning(e) + logger.warning(e, exc_info=True) return checked_results @@ -195,18 +208,60 @@ def add_key(mydict: Dict[str,str]) -> str: def train_test_split(data: Dict[str,str], tts_ratio: float) -> Tuple[Dict[str, str]]: """Splits a dictionary into train/test set based on split ratio""" - train_size = round(len(data) * tts_ratio) - train_keys = random.sample(data.keys(), train_size) - test_keys = [i for i in data.keys() if i not in train_keys] + queries = list(set([data[i]['query'] for i in data])) + + # split the data into positive and negative examples grouped by query + neg_passing = {} + pos_passing = {} + for q in queries: + subset = {i:data[i] for i in data.keys() if data[i]['query']==q} + pos_sample = [i for i in subset.keys() if subset[i]['label']==1] + neg_sample = [i for i in subset.keys() if subset[i]['label']==-1] + if len(neg_sample)>0: #since we have so few negative samples, add to neg list if it has a negative ex + neg_passing[q] = subset + elif len(pos_sample)>0: # only add the other samples if they have a positive matching sample + pos_passing[q] = subset + + pos_train_size = round(len(pos_passing.keys()) * tts_ratio) + neg_train_size = round(len(neg_passing.keys()) * tts_ratio) + + pos_train_keys = random.sample(pos_passing.keys(), pos_train_size) + neg_train_keys = random.sample(neg_passing.keys(), neg_train_size) + + pos_test_keys = [i for i in pos_passing.keys() if i not in pos_train_keys] + neg_test_keys = [i for i in neg_passing.keys() if i not in neg_train_keys] + + train_keys = [] + test_keys = [] + for x in pos_train_keys: + train_keys.extend(pos_passing[x]) + for x in pos_test_keys: + test_keys.extend(pos_passing[x]) + for x in neg_train_keys: + train_keys.extend(neg_passing[x]) + for x in neg_test_keys: + test_keys.extend(neg_passing[x]) + + train = {i:data[i] for i in train_keys} + test = {i:data[i] for i in test_keys} - train = {k: data[k] for k in train_keys} - test = {k: data[k] for k in test_keys} + metadata = { + "date_created": str(date.today()), + "n_positive_samples": f"{str(len(pos_train_keys))} train queries / {str(len(pos_test_keys))} test queries", + "n_negative_samples": f"{str(len(neg_train_keys))} train queries / {str(len(neg_test_keys))} test queries", + "total_train_samples_size": len(train), + "total_test_samples_size": len(test), + "train_queries": pos_train_keys + neg_train_keys, + "test_queries": pos_test_keys + neg_test_keys, + "split_ratio": tts_ratio + } - return train, test + return train, test, metadata def collect_matches( data: pd.DataFrame, nlp, + n_returns, relations: Dict[str, str], queries: Dict[str, str], collection: Dict[str, str], @@ -215,12 +270,11 @@ def collect_matches( """Gets matching paragraphs for each query/docid pair Args: data [pd.DataFrame]: data df with processed text at paragraph_id level for sent_index - sim: SimilarityRanker class + nlp: spacy nlp model for sim model reranking relations [Dict[str, str]]: dictionary of query:doc matches from intelligent search data queries [Dict[str, str]]: dictionary of query ids : query text from intelligent search data collection [Dict[str, str]]: dictionary of match ids : match text (doc ids) from intelligent search data label [int]: label to assign paragraphs (1=correct, 0=neutral, -1=confirmed nonmatch) - n_matching [int]: number of matching paragraphs to retrieve for the expected doc Returns: [Tuple[Dict[str, str]]]: one dictionary of found search pairs, one dictionary of notfound search pairs """ @@ -235,7 +289,7 @@ def collect_matches( doc = collection[k] uid = str(i) + '_' + str(k) # backup UID, overwritten if there are results try: - matching = get_best_paragraphs(data, query, doc, nlp) + matching = get_best_paragraphs(query, doc, nlp, n_returns) for match in matching: uid = str(i) + '_' + str(match['id']) text = ' '.join(match['text'].split(' ')[:400]) # truncate to 400 tokens @@ -253,6 +307,7 @@ def collect_negative_samples( relations: Dict[str, str], queries: Dict[str, str], collection: Dict[str, str], + any_matches: Dict[str, str], ) -> Tuple[Dict[str, str]]: """Gets negative samples each query/docid pair Args: @@ -274,7 +329,7 @@ def collect_negative_samples( doc = collection[k] uid = str(i) + '_' + str(k) + '_neg' # backup UID, overwritten if there are results try: - not_matching = get_negative_paragraphs(data=data, query=query, doc_id=k, retriever=retriever, n_returns=n_returns) + not_matching = get_negative_paragraphs(data=data, query=query, doc_id=k, retriever=retriever, n_returns=n_returns, any_matches=any_matches) for match in not_matching: uid = str(i) + '_' + str(match['doc']) text = ' '.join(match['paragraph'].split(' ')[:400]) # truncate to 400 tokens @@ -285,10 +340,27 @@ def collect_negative_samples( return found, not_found +def get_all_single_matches(): + validation_dir = get_most_recent_dir(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")) + directory = os.path.join(validation_dir, "any") + any_matches = {} + try: + f = open_json('intelligent_search_data.json', directory) + intel = json.loads(f) + for x in intel['correct'].keys(): + query = intel['queries'][x] + doc_keys = intel['correct'][x] + docs = [intel['collection'][k] for k in doc_keys] + any_matches[query] = docs + except Exception as e: + logger.warning("Could not load all validation data") + logger.warning(e) + + return any_matches + def make_training_data( index_path: Union[str, os.PathLike], n_returns: int, - n_matching: int, level: str, update_eval_data: bool, retriever=None, @@ -301,7 +373,6 @@ def make_training_data( Args: index_path [str|os.PathLike]: path to the sent index for retrieving the training data (should be most recent index) n_returns [int]: number of non-matching paragraphs to retrieve for each query - n_matching [int]: number of matching paragraphs to retrieve for the expected doc level [str]: level of eval tier to use for training data (options: ['all', 'silver', 'gold']) update_eval_data [bool]: whether or not to update the eval data before making training data sim_model_name [str]: name of sim model for loading SimilarityRanker @@ -316,6 +387,7 @@ def make_training_data( if not os.path.exists(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")) or update_eval_data: logger.info("**** Updating the evaluation data") make_tiered_eval_data(index_path) + validation_dir = get_most_recent_dir(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")) directory = os.path.join(validation_dir, level) logger.info(f"**** Loading in intelligent search data from {str(directory)}") @@ -325,6 +397,7 @@ def make_training_data( except Exception as e: logger.warning("Could not load intelligent search data") logger.warning(e) + intel = {} ## add gold standard samples logger.info("**** Adding gold standard examples") @@ -354,11 +427,12 @@ def make_training_data( logger.info("Could not load in data from retriever") logger.warning(e) + any_matches = get_all_single_matches() ## get matching paragraphs try: correct_found, correct_notfound = collect_matches( data=data, queries=intel['queries'], collection=intel['collection'], - relations=intel['correct'], label=1, nlp = nlp) + relations=intel['correct'], label=1, nlp = nlp, n_returns=n_returns) logger.info(f"---Number of correct query/result pairs that were not found: {str(len(correct_notfound))}") except Exception as e: logger.warning(e) @@ -366,7 +440,7 @@ def make_training_data( try: incorrect_found, incorrect_notfound = collect_matches( data=data, queries=intel['queries'], collection=intel['collection'], - relations=intel['incorrect'], label=-1, nlp = nlp) + relations=intel['incorrect'], label=-1, nlp = nlp, n_returns=n_returns) logger.info(f"---Number of incorrect query/result pairs that were not found: {str(len(incorrect_notfound))}") except Exception as e: logger.warning(e) @@ -377,7 +451,7 @@ def make_training_data( all_relations = {**intel['correct'], **intel['incorrect']} neutral_found, neutral_notfound = collect_negative_samples( data=data, retriever=retriever, n_returns=n_returns, queries=intel['queries'], collection=intel['collection'], - relations=all_relations) + relations=all_relations, any_matches=any_matches) logger.info(f"---Number of negative sample pairs that were not found: {str(len(neutral_notfound))}") except Exception as e: logger.warning(e) @@ -390,42 +464,13 @@ def make_training_data( with open(notfound_path, "w") as outfile: json.dump(notfound, outfile) - ## train/test split (separate on correct/incorrect for balance) - correct_train, correct_test = train_test_split(correct_found, tts_ratio) - incorrect_train, incorrect_test = train_test_split(incorrect_found, tts_ratio) - neutral_train, neutral_test = train_test_split(neutral_found, tts_ratio) - train = {**correct_train, **incorrect_train, **neutral_train} - test = {**correct_test, **incorrect_test, **neutral_test} - - try:## check labels - pos = len([i for i in train if train[i]['label'] == 1]) - logger.info(f"*** {str(pos)} positive samples in TRAIN") - neutral = len([i for i in train if train[i]['label'] == 0]) - logger.info(f"*** {str(neutral)} neutral samples in TRAIN") - neg = len([i for i in train if train[i]['label'] == -1]) - logger.info(f"*** {str(neg)} negative samples in TRAIN") - - ## check labels - pos_test = len([i for i in test if test[i]['label'] == 1]) - logger.info(f"*** {str(pos_test)} positive samples in TEST") - neutral_test = len([i for i in test if test[i]['label'] == 0]) - logger.info(f"*** {str(neutral_test)} neutral samples in TEST") - neg_test = len([i for i in test if test[i]['label'] == -1]) - logger.info(f"*** {str(neg_test)} negative samples in TEST") - except Exception as e: - logger.warning("Could not check stats for train/test") - logger.warning(e) + all_examples = {**neutral_found, **incorrect_found, **correct_found} + logger.info(f"Total size of query-doc pairs: {str(len(all_examples))}") + + ## train/test split + train, test, metadata = train_test_split(all_examples, tts_ratio) data = {"train": train, "test": test} - metadata = { - "date_created": str(date.today()), - "n_positive_samples": f"{str(pos)} train / {str(pos_test)} test", - "n_neutral_samples": f"{str(neutral)} train / {str(neutral_test)} test", - "n_negative_samples": f"{str(neg)} train / {str(neg_test)} test", - "train_size": len(train), - "test_size": len(test), - "split_ratio": tts_ratio - } logger.info(f"**** Generated training data: \n {metadata}") @@ -442,5 +487,5 @@ def make_training_data( if __name__ == '__main__': make_training_data( - index_path="gamechangerml/models/sent_index_20210715", n_returns=50, n_matching=3, level="silver", - update_eval_data=False) \ No newline at end of file + index_path="gamechangerml/models/sent_index_20220103", n_returns=50, level="silver", + update_eval_data=True) \ No newline at end of file diff --git a/gamechangerml/scripts/update_eval_data.py b/gamechangerml/scripts/update_eval_data.py index 76de449c..a72d1809 100644 --- a/gamechangerml/scripts/update_eval_data.py +++ b/gamechangerml/scripts/update_eval_data.py @@ -8,8 +8,18 @@ make_timestamp_directory, check_directory, CustomJSONizer ) from gamechangerml import DATA_PATH +from gamechangerml.api.utils.pathselect import get_model_paths +import logging +logger = logging.getLogger() + +model_path_dict = get_model_paths() +SENT_INDEX = model_path_dict['sentence'] + def make_tiered_eval_data(index_path): + + if not index_path: + index_path = SENT_INDEX if not os.path.exists(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")): os.mkdir(os.path.join(DATA_PATH, "validation", "domain", "sent_transformer")) @@ -83,7 +93,8 @@ def save_data( with open(metafile, "w") as outfile: json.dump(metadata, outfile) - + logger.info(f"***Saved intelligent search validation data to: {intel_path}") + return metadata all_data = save_data( @@ -94,7 +105,7 @@ def save_data( silver_data = save_data( level='silver', - filter_queries = True, + filter_queries = False, **ValidationConfig.TRAINING_ARGS ) @@ -107,5 +118,8 @@ def save_data( return all_data, silver_data, gold_data if __name__ == '__main__': - - make_tiered_eval_data() \ No newline at end of file + + try: + make_tiered_eval_data(index_path=None) + except Exception as e: + logger.warning(e, exc_info=True) \ No newline at end of file diff --git a/gamechangerml/src/model_testing/metrics.py b/gamechangerml/src/model_testing/metrics.py index 5ad1f206..943079ce 100644 --- a/gamechangerml/src/model_testing/metrics.py +++ b/gamechangerml/src/model_testing/metrics.py @@ -72,6 +72,24 @@ def reciprocal_rank(ranked_results: List[str], expected: List[str]) -> float: else: return 0 +def reciprocal_rank_score(ranked_scores: List[str]) -> float: + ''' + Calculates the reciprocal of the rank of the first correct score (returns single value from 0 to 1). + ''' + first_relevant_rank = 0 # if no relevant results show up, the RR will be 0 + count = 1 + for i in ranked_scores: # list in order of rank + if i == 1: + first_relevant_rank = count + break + else: + count += 1 + + if first_relevant_rank > 0: + return np.round((1 / first_relevant_rank), 3) + else: + return 0 + def get_MRR(reciprocal_ranks: List[float]) -> float: '''Takes list of reciprocal rank scores for each search and averages them.''' return np.round(np.mean(reciprocal_ranks), 3) diff --git a/gamechangerml/src/model_testing/validation_data.py b/gamechangerml/src/model_testing/validation_data.py index ad31763a..e2edacf1 100644 --- a/gamechangerml/src/model_testing/validation_data.py +++ b/gamechangerml/src/model_testing/validation_data.py @@ -428,7 +428,7 @@ def clean_quot(string): return string.replace(""", "'").replace("'", "'").lower() def clean_doc(string): - return string.strip(".pdf") + return string.split(".pdf")[0] def is_question(string): """If we find a good way to use search history for QA validation (not used currently)""" diff --git a/gamechangerml/src/search/sent_transformer/finetune.py b/gamechangerml/src/search/sent_transformer/finetune.py index 881b1aaa..178c0d2c 100644 --- a/gamechangerml/src/search/sent_transformer/finetune.py +++ b/gamechangerml/src/search/sent_transformer/finetune.py @@ -20,6 +20,7 @@ import torch.nn.functional as F from torch import nn torch.cuda.empty_cache() +from gamechangerml.src.model_testing.metrics import reciprocal_rank_score, get_MRR S3_DATA_PATH = "bronze/gamechanger/ml-data" @@ -47,19 +48,7 @@ def fix_model_config(model_load_path): logger.info("Could not update model config file") -def get_cos_sim(model, pair): - - emb1 = model.encode(pair[0], show_progress_bar=False) - emb2 = model.encode(pair[1], show_progress_bar=False) - try: - sim = float(util.cos_sim(emb1, emb2)) - except: - sim = float(cos_sim(emb1, emb2)) - - return sim - - -def format_inputs(train, test): +def format_inputs(train, test, data_dir): """Create input data for dataloader and df for tracking cosine sim""" train_samples = [] @@ -71,20 +60,21 @@ def format_inputs(train, test): score = float(train[i]["label"]) inputex = InputExample(str(count), texts, score) train_samples.append(inputex) - all_data.append([i, texts, score, "train"]) + all_data.append([train[i]["query"], texts, score, "train"]) count += 1 #processmanager.update_status(processmanager.loading_data, count, total) for x in test.keys(): texts = [test[x]["query"], test[x]["paragraph"]] score = float(test[x]["label"]) - all_data.append([x, texts, score, "test"]) + all_data.append([test[x]["query"], texts, score, "test"]) count += 1 - #processmanager.update_status(processmanager.loading_data, count, total) + processmanager.update_status(processmanager.loading_data, count, total) df = pd.DataFrame(all_data, columns=["key", "pair", "score", "label"]) + df.to_csv(os.path.join(data_dir, timestamp_filename("finetuning_data", ".csv"))) - return train_samples, df + return train_samples class STFinetuner(): @@ -98,10 +88,8 @@ def __init__(self, model_load_path, model_save_path, shuffle, batch_size, epochs self.batch_size = batch_size self.epochs = epochs self.warmup_steps = warmup_steps - self.device = torch.device( - "cuda" if torch.cuda.is_available() else "cpu") - #self.pin_memory = True if torch.cuda.is_available() else False - + self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu") + def retrain(self, data_dir, testing_only, version): try: @@ -123,12 +111,7 @@ def retrain(self, data_dir, testing_only, version): processmanager.update_status(processmanager.training, 0, 1) sleep(0.1) # make formatted training data - train_samples, df = format_inputs(train, test) - - # get cosine sim before finetuning - # TODO: you should be able to encode this more efficiently - df["original_cos_sim"] = df["pair"].apply( - lambda x: get_cos_sim(self.model, x)) + train_samples = format_inputs(train, test, data_dir) # finetune on samples logger.info("Starting dataloader...") @@ -150,63 +133,7 @@ def retrain(self, data_dir, testing_only, version): # when not testing only, save to S3 if not testing_only: - dst_path = self.model_save_path + ".tar.gz" - utils.create_tgz_from_dir( - src_dir=self.model_save_path, dst_archive=dst_path) - model_id = self.model_save_path.split('_')[1] - logger.info(f"*** Created tgz file and saved to {dst_path}") - - S3_MODELS_PATH = "bronze/gamechanger/models" - s3_path = os.path.join(S3_MODELS_PATH, str(version)) - utils.upload(s3_path, dst_path, "transformers", model_id) - logger.info(f"*** Saved model to S3: {s3_path}") - - logger.info("*** Making finetuning results csv") - # get new cosine sim - df["new_cos_sim"] = df["pair"].apply( - lambda x: get_cos_sim(self.model, x)) - df["change_cos_sim"] = df["new_cos_sim"] - df["original_cos_sim"] - - # save all results to CSV - df.to_csv(os.path.join(data_dir, timestamp_filename( - "finetuning_results", ".csv"))) - - # create training metadata - positive_change_train = df[(df["score"] == 1.0) & ( - df["label"] == "train")]["change_cos_sim"].median() - negative_change_train = df[( - df["score"] == -1.0) & (df["label"] == "train")]["change_cos_sim"].median() - neutral_change_train = df[(df["score"] == 0.0) & ( - df["label"] == "train")]["change_cos_sim"].median() - positive_change_test = df[(df["score"] == 1.0) & ( - df["label"] == "test")]["change_cos_sim"].median() - negative_change_test = df[( - df["score"] == -1.0) & (df["label"] == "test")]["change_cos_sim"].median() - neutral_change_test = df[(df["score"] == 0.0) & ( - df["label"] == "test")]["change_cos_sim"].median() - - ft_metadata = { - "date_finetuned": str(date.today()), - "data_dir": str(data_dir), - "positive_change_train": positive_change_train, - "negative_change_train": negative_change_train, - "neutral_change_train": neutral_change_train, - "positive_change_test": positive_change_test, - "negative_change_test": negative_change_test, - "neutral_change_test": neutral_change_test - } - - # save metadata file - ft_metadata_path = os.path.join( - data_dir, timestamp_filename("finetuning_metadata", ".json")) - with open(ft_metadata_path, "w") as outfile: - json.dump(ft_metadata, outfile) - - logger.info("Metadata saved to {}".format(ft_metadata_path)) - logger.info(str(ft_metadata)) - - # when not testing only, save to S3 - if not testing_only: + logger.info("Saving data to S3...") s3_path = os.path.join(S3_DATA_PATH, f"{version}") logger.info(f"**** Saving new data files to S3: {s3_path}") dst_path = data_dir + ".tar.gz" @@ -216,7 +143,18 @@ def retrain(self, data_dir, testing_only, version): logger.info("*** Attempting to upload data to s3") utils.upload(s3_path, dst_path, "data", model_name) - return ft_metadata + logger.info("Saving model to S3...") + dst_path = self.model_save_path + ".tar.gz" + utils.create_tgz_from_dir(src_dir=self.model_save_path, dst_archive=dst_path) + model_id = self.model_save_path.split('_')[1] + logger.info(f"*** Created tgz file and saved to {dst_path}") + + S3_MODELS_PATH = "bronze/gamechanger/models" + s3_path = os.path.join(S3_MODELS_PATH, str(version)) + utils.upload(s3_path, dst_path, "transformers", model_id) + logger.info(f"*** Saved model to S3: {s3_path}") + + return {} except Exception as e: logger.warning("Could not complete finetuning") diff --git a/gamechangerml/src/search/sent_transformer/model.py b/gamechangerml/src/search/sent_transformer/model.py index f384624e..e62744c8 100644 --- a/gamechangerml/src/search/sent_transformer/model.py +++ b/gamechangerml/src/search/sent_transformer/model.py @@ -17,6 +17,7 @@ from gamechangerml.src.model_testing.validation_data import MSMarcoData + class SentenceEncoder(object): """ Handles text encoding and creating of ANNOY index @@ -92,6 +93,7 @@ def _index(self, corpus, index_path, overwrite=False, save_embedding=False): dataframe_path = os.path.join(index_path, "data.csv") ids_path = os.path.join(index_path, "doc_ids.txt") + ''' # Load new data if os.path.isfile(embedding_path) and (overwrite is False): logger.info(f"Loading new data from {embedding_path}") @@ -113,6 +115,7 @@ def _index(self, corpus, index_path, overwrite=False, save_embedding=False): # Append new dataframe old_df = pd.read_csv(dataframe_path) df = pd.concat([old_df, df]) + ''' # Store embeddings and document index # for future reference @@ -161,10 +164,11 @@ def index_documents(self, corpus_path, index_path): verbose=self.verbose, ) corpus = [(para_id, " ".join(tokens), None) - for tokens, para_id in corp] + for tokens, para_id in corp] logger.info( f"\nLength of batch (in par ids) for indexing : {str(len(corpus))}" ) + else: logger.info( "Did not include path to corpus, making test index with msmarco data" @@ -178,7 +182,7 @@ def index_documents(self, corpus_path, index_path): self._index(corpus, index_path) processmanager.update_status( - processmanager.training, 1, 1, "finished building sent index" + processmanager.training, 1, 0, "finished building sent index" ) self.embedder.save(index_path) diff --git a/gamechangerml/src/utilities/test_utils.py b/gamechangerml/src/utilities/test_utils.py index 9bdf7d7d..432107bf 100644 --- a/gamechangerml/src/utilities/test_utils.py +++ b/gamechangerml/src/utilities/test_utils.py @@ -324,6 +324,7 @@ def concat_csvs(directory): df = pd.DataFrame() logger.info(str(directory)) csvs = [i for i in os.listdir(directory) if i.split('.')[-1]=='csv'] + csvs = [i for i in csvs if i[:2] != '._'] logger.info(f"Combining csvs: {str(csvs)}") for i in csvs: try: @@ -347,3 +348,4 @@ def get_most_recent_dir(parent_dir): return max(subdirs, key=os.path.getctime) else: logger.error("There are no subdirectories to retrieve most recent data from") + return None diff --git a/gamechangerml/train/pipeline.py b/gamechangerml/train/pipeline.py index 7bfea194..c731d52a 100644 --- a/gamechangerml/train/pipeline.py +++ b/gamechangerml/train/pipeline.py @@ -156,10 +156,9 @@ def create_metadata( ), days: int = 80, prod_data_file=PROD_DATA_FILE, - n_returns: int = 50, - n_matching: int = 3, - level: str = 'silver', - update_eval_data: bool = False, + n_returns: int=50, + level: str='silver', + update_eval_data: bool=False, retriever=None, upload: bool = True, version: str = "v1" @@ -190,8 +189,7 @@ def create_metadata( make_corpus_meta(corpus_dir, days, prod_data_file, upload) if "update_sent_data" in meta_steps: try: - make_training_data( - index_path, n_returns, n_matching, level, update_eval_data, retriever) + make_training_data(index_path, n_returns, level, update_eval_data, retriever) except Exception as e: logger.warning(e, exc_info=True) if upload: @@ -213,7 +211,9 @@ def finetune_sent( epochs: int = 3, warmup_steps: int = 100, testing_only: bool = False, - version: str = "v100" + remake_train_data: bool = False, + retriever = None, + version: str = "v1" ) -> t.Dict[str, str]: """finetune_sent: finetunes the sentence transformer - saves new model, a csv file of old/new cos sim scores, and a metadata file. @@ -234,10 +234,34 @@ def finetune_sent( model_save_path = model_load_path + "_" + model_id logger.info( f"Setting {str(model_save_path)} as save path for new model") - data_path = get_most_recent_dir(os.path.join( - DATA_PATH, "training", "sent_transformer")) - if not data_path: - quit() + no_data=False + base_dir = os.path.join(DATA_PATH, "training", "sent_transformer") + + ## check if training data exists + if remake_train_data: + no_data = True + elif not os.path.isdir(base_dir): # if no training data directory exists + no_data=True + os.makedirs(base_dir) + elif len(os.listdir(base_dir))==0: # if base dir exists but there are no files + no_data=True + elif get_most_recent_dir(base_dir)==None: + no_data = True + elif len(os.listdir(get_most_recent_dir(base_dir)))==0: + no_data=True + logger.info(f"No data flag is set to: {str(no_data)}") + + #if we don't have data, make training data + if no_data: + make_training_data( + index_path=SENT_INDEX, + n_returns=50, + level='silver', + update_eval_data=True, + retriever=retriever + ) + + data_path = get_most_recent_dir(base_dir) logger.info(f"Loading in domain data to finetune from {data_path}") finetuner = STFinetuner( model_load_path=model_load_path,