From a15fa0df5e7ba42ccbe076b0fd680e50741011c7 Mon Sep 17 00:00:00 2001 From: Jared Ross Date: Mon, 22 Aug 2022 14:30:37 -0400 Subject: [PATCH 01/15] added ability to clear certain cache and send cache to frontend --- gamechangerml/api/fastapi/routers/controls.py | 19 +++++++++++++++++++ gamechangerml/api/fastapi/routers/search.py | 2 +- gamechangerml/api/fastapi/routers/startup.py | 2 +- 3 files changed, 21 insertions(+), 2 deletions(-) diff --git a/gamechangerml/api/fastapi/routers/controls.py b/gamechangerml/api/fastapi/routers/controls.py index b630ce08..a34c1737 100644 --- a/gamechangerml/api/fastapi/routers/controls.py +++ b/gamechangerml/api/fastapi/routers/controls.py @@ -7,6 +7,7 @@ import shutil import threading import pandas as pd +import redis from datetime import datetime from gamechangerml import DATA_PATH @@ -37,6 +38,8 @@ ) from gamechangerml.src.data_transfer import download_corpus_s3 from gamechangerml.api.utils.threaddriver import MlThread +from gamechangerml.api.utils.redisdriver import RedisPool + from gamechangerml.train.pipeline import Pipeline from gamechangerml.api.utils import processmanager from gamechangerml.api.fastapi.model_loader import ModelLoader @@ -85,6 +88,22 @@ async def get_process_status(): "completed_process": processmanager.COMPLETED_PROCESS.value, } +@router.post("/clearCache") +async def get_process_status(body: dict, response: Response): + _connection = redis.Redis(connection_pool=RedisPool().getPool()) + + if body['clear']: + for key in body['clear']: + _connection.delete(f'search: {key}') + else: + for key in _connection.scan_iter("search:*"): + # delete the key + _connection.delete(key) + +@router.get("/getCache") +async def get_process_status(): + _connection = redis.Redis(connection_pool=RedisPool().getPool()) + return [key.split('search: ')[1] for key in list(_connection.scan_iter("search:*"))] @router.get("/getDataList") def get_downloaded_data_list(): diff --git a/gamechangerml/api/fastapi/routers/search.py b/gamechangerml/api/fastapi/routers/search.py index 823ccdda..1d01bfd9 100644 --- a/gamechangerml/api/fastapi/routers/search.py +++ b/gamechangerml/api/fastapi/routers/search.py @@ -106,7 +106,7 @@ async def trans_sentence_infer( results = {} try: query_text = body["text"] - cache = CacheVariable(query_text, True) + cache = CacheVariable(f'search: {query_text}', True) cached_value = cache.get_value() if cached_value: logger.info("Searched was found in cache") diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index 7dbe3765..4c65dea2 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -30,7 +30,7 @@ MODELS.initWordSim, MODELS.initTopics, MODELS.initRecommender, - MODELS.initDocumentCompareSearcher, + # MODELS.initDocumentCompareSearcher, ] From 646fbc6e0cc1ce39963b8b537ca73519c79f0794 Mon Sep 17 00:00:00 2001 From: Jared Ross Date: Mon, 22 Aug 2022 14:32:37 -0400 Subject: [PATCH 02/15] fix startup file --- gamechangerml/api/fastapi/routers/startup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index 4c65dea2..7dbe3765 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -30,7 +30,7 @@ MODELS.initWordSim, MODELS.initTopics, MODELS.initRecommender, - # MODELS.initDocumentCompareSearcher, + MODELS.initDocumentCompareSearcher, ] From a4cea74829166af11157b6c29977be34c79c12e6 Mon Sep 17 00:00:00 2001 From: 604840 Date: Thu, 25 Aug 2022 16:27:48 -0400 Subject: [PATCH 03/15] adding schedule and check for outdated corpus and downloading a new one --- gamechangerml/api/fastapi/routers/controls.py | 5 +- gamechangerml/api/fastapi/routers/startup.py | 21 ++++- gamechangerml/api/utils/logger.py | 14 +-- gamechangerml/api/utils/mlscheduler.py | 87 +++++++++++++++++++ .../src/data_transfer/s3_download.py | 12 +-- .../query_expansion/tests/model_test_qe.py | 55 ++++++++++++ gamechangerml/src/utilities/logger.py | 24 ++--- gamechangerml/train/pipeline.py | 71 ++++++--------- 8 files changed, 215 insertions(+), 74 deletions(-) create mode 100644 gamechangerml/api/utils/mlscheduler.py diff --git a/gamechangerml/api/fastapi/routers/controls.py b/gamechangerml/api/fastapi/routers/controls.py index b630ce08..0e390c88 100644 --- a/gamechangerml/api/fastapi/routers/controls.py +++ b/gamechangerml/api/fastapi/routers/controls.py @@ -753,10 +753,9 @@ def reload_thread(model_dict): @router.post("/downloadCorpus", status_code=200) async def download_corpus(corpus_dict: dict, response: Response): - """load_latest_models - endpoint for updating the transformer model + """download_corpus - endpoint for downloading corpus Args: - model_dict: dict; {"sentence": "bert...", - "qexp": "bert...", "transformer": "bert..."} + corpus_dict: dict; {"corpus": "bronze/gamechanger/json"} Response: Response class; for status codes(apart of fastapi do not need to pass param) Returns: """ diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index 7dbe3765..5dae9e53 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -1,6 +1,8 @@ from fastapi import APIRouter from fastapi_utils.tasks import repeat_every import os +from typing import Tuple + from gamechangerml.api.fastapi.settings import ( DOC_COMPARE_SENT_INDEX_PATH, logger, @@ -18,6 +20,10 @@ MEMORY_LOAD_LIMIT, ) from gamechangerml.api.fastapi.model_loader import ModelLoader +from gamechangerml.api.utils.mlscheduler import check_corpus_diff +from gamechangerml.api.utils.threaddriver import MlThread +from gamechangerml.api.utils import processmanager + import psutil router = APIRouter() @@ -92,7 +98,20 @@ async def check_health(): logger.info(f"RAM % used: {ram_used}") -def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> (float, bool, float): +@router.on_event("startup") +@repeat_every(seconds=60 * 60, wait_first=False) +async def check_corpus_health(): + logger.info("Checking corpus diff") + args = { + "s3_corpus_dir": "bronze/gamechanger/json", + "logger": logger, + } + corpus_thread = MlThread(check_corpus_diff, args) + corpus_thread.start() + processmanager.running_threads[corpus_thread.ident] = corpus_thread + + +def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> Tuple[float, bool, float]: surpassed = False ram_used = psutil.virtual_memory()[2] if threshold: diff --git a/gamechangerml/api/utils/logger.py b/gamechangerml/api/utils/logger.py index 3b535d00..51904649 100644 --- a/gamechangerml/api/utils/logger.py +++ b/gamechangerml/api/utils/logger.py @@ -1,27 +1,21 @@ import logging from logging import handlers import sys +from gamechangerml.src.utilities import configure_logger # set loggers -logger = logging.getLogger() +logger = configure_logger() glogger = logging.getLogger("gunicorn.error") -logger.setLevel(logging.DEBUG) -log_formatter = logging.Formatter( - "%(asctime)s [%(levelname)s][PID:%(process)d]: %(message)s" -) try: - ch = logging.StreamHandler(sys.stdout) - ch.setFormatter(log_formatter) - logger.addHandler(ch) - glogger.addHandler(ch) + # glogger.addHandler(ch) log_file_path = "gamechangerml/api/logs/gc_ml_logs.txt" fh = logging.handlers.RotatingFileHandler( log_file_path, maxBytes=2000000, backupCount=1, mode="a" ) logger.info(f"ML API is logging to {log_file_path}") - fh.setFormatter(log_formatter) + # fh.setFormatter(log_formatter) logger.addHandler(fh) glogger.addHandler(fh) except Exception as e: diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py new file mode 100644 index 00000000..51f16e97 --- /dev/null +++ b/gamechangerml/api/utils/mlscheduler.py @@ -0,0 +1,87 @@ +"""Utility functions for scheduling ml builds based on events + +Also see gamechangerml.src.services.s3_service.py +""" + +from threading import current_thread +from os import makedirs +from os.path import join, exists, basename + +from gamechangerml.src.services.s3_service import S3Service +from gamechangerml.src.utilities import configure_logger +from gamechangerml.configs import S3Config +from gamechangerml.api.utils import processmanager +from gamechangerml.api.fastapi.routers.controls import download_corpus +from gamechangerml.api.utils.threaddriver import MlThread + +from gamechangerml.src.data_transfer import delete_local_corpus, download_corpus_s3 + +import os +from queue import Queue + +from fastapi import APIRouter, Response + +from gamechangerml.api.fastapi.settings import ( + CORPUS_DIR, + S3_CORPUS_PATH, +) + + +def check_corpus_diff( + s3_corpus_dir: str, + corpus_dir: str = "gamechangerml/corpus", + bucket=None, + logger=None, +) -> bool: + if logger is None: + logger = configure_logger() + + if bucket is None: + bucket = S3Service.connect_to_bucket(S3Config.BUCKET_NAME, logger) + + process = processmanager.corpus_download + + try: + logger.info("Getting ratio") + + s3_filter = bucket.objects.filter(Prefix=f"{s3_corpus_dir}/") + total = len(list(s3_filter)) + local_corpus_size = len(os.listdir(corpus_dir)) + logger.info(f"local corpus size {local_corpus_size}") + logger.info(f"total corpus size {total}") + + ratio = local_corpus_size / total + logger.info(ratio) + if ratio < 0.95: + logger.info("Corpus is out of date - downloading data") + try: + logger.info("Attempting to download corpus from S3") + # grabs the s3 path to the corpus from the post in "corpus" + # then passes in where to dowload the corpus locally. + + args = { + "s3_corpus_dir": s3_corpus_dir, + "output_dir": CORPUS_DIR, + "logger": logger, + } + + logger.info(args) + corpus_thread = MlThread(download_corpus_s3, args) + corpus_thread.start() + processmanager.running_threads[corpus_thread.ident] = corpus_thread + processmanager.update_status( + processmanager.corpus_download, 0, 1, thread_id=corpus_thread.ident + ) + except Exception as e: + logger.exception("Could not get corpus from S3") + processmanager.update_status( + processmanager.corpus_download, + failed=True, + message=e, + thread_id=corpus_thread.ident, + ) + except Exception: + logger.exception("Failed to read corpus in S3") + processmanager.update_status( + process, failed=True, thread_id=current_thread().ident + ) diff --git a/gamechangerml/src/data_transfer/s3_download.py b/gamechangerml/src/data_transfer/s3_download.py index 17656684..b9df6fd0 100644 --- a/gamechangerml/src/data_transfer/s3_download.py +++ b/gamechangerml/src/data_transfer/s3_download.py @@ -13,6 +13,8 @@ from gamechangerml.api.utils import processmanager from gamechangerml.src.data_transfer import delete_local_corpus +import os + def download_corpus_s3( s3_corpus_dir, @@ -87,13 +89,14 @@ def download_corpus_s3( return corpus + def download_eval_data( bucket, dataset_name, save_dir, logger, version=None, - ): +): """Download evaluation data from S3. Args: @@ -101,7 +104,7 @@ def download_eval_data( dataset_name (str): Name of the dataset to download. save_dir (str): Path to local directory to save data. logger (logging.Logger) - version (int or None, optional): Version number of the dataset to + version (int or None, optional): Version number of the dataset to download. If None, downloads the latest version. Default is None. Returns: @@ -109,7 +112,7 @@ def download_eval_data( """ save_dir = join(save_dir, dataset_name) makedirs(save_dir, exist_ok=True) - + # Ensure the dataset name exists prefix = S3Config.EVAL_DATA_DIR all_datasets = S3Service.get_object_names(bucket, prefix, "dir") @@ -118,7 +121,7 @@ def download_eval_data( f"{dataset_name} does not exist. Available datasets are: {all_datasets}." ) return None - + # Get version numbers available for the given dataset name prefix = f"{prefix}{dataset_name}/" try: @@ -146,4 +149,3 @@ def download_eval_data( logger.info(f"Downloading {dataset_name} version {version}...") S3Service.download(bucket, prefix + f"v{version}", save_dir, logger) - diff --git a/gamechangerml/src/search/query_expansion/tests/model_test_qe.py b/gamechangerml/src/search/query_expansion/tests/model_test_qe.py index 86c3a4c4..a16b84c5 100644 --- a/gamechangerml/src/search/query_expansion/tests/model_test_qe.py +++ b/gamechangerml/src/search/query_expansion/tests/model_test_qe.py @@ -1,5 +1,60 @@ import logging from gamechangerml.src.search.query_expansion.utils import remove_original_kw +# flake8: noqa +# pylint: skip-file + +import logging +import os +from pathlib import Path + +import pytest + +from gamechangerml.src.search.query_expansion.build_ann_cli.build_qe_model import ( # noqa + main, +) +from gamechangerml.src.search.query_expansion.qe import QE +from gamechangerml.configs import QexpConfig +from gamechangerml.api.fastapi.settings import QEXP_MODEL_NAME +log_fmt = ( + "[%(asctime)s %(levelname)-8s], [%(filename)s:%(lineno)s - " + + "%(funcName)s()], %(message)s" +) +logging.basicConfig(level=logging.DEBUG, format=log_fmt) +logger = logging.getLogger(__name__) + +try: + here = os.path.dirname(os.path.realpath(__file__)) + p = Path(here) + test_data_dir = os.path.join(p.parents[3], "data", "test_data") + aux_path = os.path.join(p.parents[3], "data", "features") + word_wt = os.path.join(aux_path, "enwiki_vocab_min200.txt") + assert os.path.isfile(word_wt) +except (AttributeError, FileExistsError) as e: + logger.exception("{}: {}".format(type(e), str(e)), exc_info=True) + + +@pytest.fixture(scope="session") +def ann_index_dir(tmpdir_factory): + fn = tmpdir_factory.mktemp("data") + return str(fn) + + +@pytest.fixture(scope="session") +def qe_obj(ann_index_dir): + # main(test_data_dir, ann_index_dir, weight_file=word_wt) + return QE( + QEXP_MODEL_NAME.value, **QexpConfig.INIT_ARGS + ) + + +# @pytest.fixture(scope="session") +# def qe_mlm_obj(): +# return QE(QEXP_MODEL_NAME.value, QexpConfig.INIT_ARGS["qe_files_dir"], "mlm") + + +@pytest.fixture(scope="session") +def topn(): + return 2 import pytest diff --git a/gamechangerml/src/utilities/logger.py b/gamechangerml/src/utilities/logger.py index 3ada5aeb..4947298d 100644 --- a/gamechangerml/src/utilities/logger.py +++ b/gamechangerml/src/utilities/logger.py @@ -1,11 +1,12 @@ from os import makedirs from os.path import split +import logging from logging import getLogger, StreamHandler, Formatter, FileHandler def configure_logger( name="gamechanger", - min_level="DEBUG", + min_level="INFO", file_path=None, msg_fmt="%(levelname)s - %(asctime)s - %(filename)s - line %(lineno)s - %(message)s", date_fmt="%Y-%m-%d %H:%M:%S", @@ -13,26 +14,29 @@ def configure_logger( """Configure a logger object. Args: - name (str or None, optional): If str, name of the logger to get/ create. + name (str or None, optional): If str, name of the logger to get/ create. If None, will get the root logger. Default is "gamechanger". - min_level (str or int, optional): Denotes the minimum level to log. See - https://docs.python.org/3/library/logging.html#levels for options. + min_level (str or int, optional): Denotes the minimum level to log. See + https://docs.python.org/3/library/logging.html#levels for options. Defaults to "DEBUG". - file_path (str or None, optional): If str, path to a ".log" file to - record log messages. If None, will not log to a file. Default is + file_path (str or None, optional): If str, path to a ".log" file to + record log messages. If None, will not log to a file. Default is None. - msg_fmt (str, optional): Log message formatting. Default is + msg_fmt (str, optional): Log message formatting. Default is "%(asctime)s - %(levelname)s - %(filename)s - line %(lineno)s - %(message)s" date_fmt (str, optional): Date format for log messages. Default is "%Y-%m-%d %H:%M:%S". - + Returns: logging.Logger """ logger = getLogger(name) logger.propagate = False logger.setLevel(min_level) - + logging.getLogger("boto3").setLevel(logging.CRITICAL) + logging.getLogger("botocore").setLevel(logging.CRITICAL) + logging.getLogger("s3transfer").setLevel(logging.CRITICAL) + logging.getLogger("urllib3").setLevel(logging.CRITICAL) has_stream = False has_file = False for handler in logger.handlers: @@ -57,5 +61,5 @@ def configure_logger( file_handler.setFormatter(formatter) file_handler.setLevel(min_level) logger.addHandler(file_handler) - + return logger diff --git a/gamechangerml/train/pipeline.py b/gamechangerml/train/pipeline.py index 970c184e..9c46cf9d 100644 --- a/gamechangerml/train/pipeline.py +++ b/gamechangerml/train/pipeline.py @@ -1,6 +1,7 @@ import argparse from gamechangerml import MODEL_PATH, DATA_PATH, REPO_PATH -from gamechangerml.src.search.doc_compare.model import DocCompareSentenceEncoder + +# from gamechangerml.src.search.doc_compare.model import DocCompareSentenceEncoder from gamechangerml.src.search.ranking.ltr import LTR from gamechangerml.src.featurization.topic_modeling import Topics import logging @@ -57,7 +58,7 @@ SimilarityConfig, QexpConfig, D2VConfig, - PathConfig + PathConfig, ) import pandas as pd @@ -70,8 +71,7 @@ logger = logging.getLogger(__name__) handler = logging.StreamHandler() -formatter = logging.Formatter( - "%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s") +formatter = logging.Formatter("%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s") handler.setFormatter(formatter) logger.addHandler(handler) logger.setLevel(logging.INFO) @@ -106,14 +106,12 @@ def __init__(self): # read in input data files try: self.search_history = pd.read_csv( - os.path.join(USER_DATA_PATH, "search_history", - "SearchPdfMapping.csv") + os.path.join(USER_DATA_PATH, "search_history", "SearchPdfMapping.csv") ) self.topics = pd.read_csv( os.path.join(FEATURES_DATA_PATH, "topics_wiki.csv") ) - self.orgs = pd.read_csv(os.path.join( - FEATURES_DATA_PATH, "agencies.csv")) + self.orgs = pd.read_csv(os.path.join(FEATURES_DATA_PATH, "agencies.csv")) except Exception as e: logger.info(e) @@ -185,8 +183,7 @@ def create_metadata( if "pop_docs" in meta_steps: make_pop_docs(self.search_history, self.pop_docs_path) if "combined_ents" in meta_steps: - make_combined_entities( - self.topics, self.orgs, self.combined_ents_path) + make_combined_entities(self.topics, self.orgs, self.combined_ents_path) if "rank_features" in meta_steps: make_corpus_meta(corpus_dir, days, prod_data_file, upload) if "update_sent_data" in meta_steps: @@ -208,12 +205,8 @@ def create_metadata( model_name = datetime.now().strftime("%Y%m%d") model_prefix = "data" dst_path = DATA_PATH + model_name + ".tar.gz" - utils.create_tgz_from_dir( - src_dir=DATA_PATH, dst_archive=dst_path) - s3_path = os.path.join( - s3_path, - f"{model_prefix}_{model_name}.tar.gz" - ) + utils.create_tgz_from_dir(src_dir=DATA_PATH, dst_archive=dst_path) + s3_path = os.path.join(s3_path, f"{model_prefix}_{model_name}.tar.gz") S3Service.upload_file(bucket, dst_path, s3_path, logger) except Exception as e: logger.warning(e, exc_info=True) @@ -287,8 +280,7 @@ def finetune_sent( else: model_id = datetime.now().strftime("%Y%m%d") model_save_path = model_load_path + "_" + model_id - logger.info( - f"Setting {str(model_save_path)} as save path for new model") + logger.info(f"Setting {str(model_save_path)} as save path for new model") logger.info(f"Loading in domain data to finetune from {data_path}") finetuner = STFinetuner( model_load_path=model_load_path, @@ -297,7 +289,7 @@ def finetune_sent( batch_size=batch_size, epochs=epochs, warmup_steps=warmup_steps, - processmanager = processmanager + processmanager=processmanager, ) logger.info("Loaded finetuner class...") logger.info(f"Testing only is set to: {testing_only}") @@ -352,8 +344,7 @@ def evaluate( logger.info(f"Attempting to evaluate model {model_name}") if "bert-base-cased-squad2" in model_name: - results[eval_type] = eval_qa( - model_name, sample_limit, eval_type) + results[eval_type] = eval_qa(model_name, sample_limit, eval_type) elif "msmarco-distilbert" in model_name: for e_type in ["domain", "original"]: results[e_type] = eval_sent( @@ -368,8 +359,7 @@ def evaluate( model_name, validation_data, eval_type="domain", retriever=retriever ) elif "distilbart-mnli-12-3" in model_name: - results[eval_type] = eval_sim( - model_name, sample_limit, eval_type) + results[eval_type] = eval_sim(model_name, sample_limit, eval_type) elif "qexp" in model_name: results["domain"] = eval_qe(model_name) else: @@ -426,8 +416,7 @@ def create_qexp( # build ANN indices index_dir = os.path.join(model_dest, model_path) bqe.main(corpus, index_dir, **QexpConfig.BUILD_ARGS) - logger.info( - "-------------- Model Training Complete --------------") + logger.info("-------------- Model Training Complete --------------") # Create .tgz file dst_path = index_dir + ".tar.gz" utils.create_tgz_from_dir(src_dir=index_dir, dst_archive=dst_path) @@ -446,8 +435,7 @@ def create_qexp( # qxpeval = QexpEvaluator(qe_model_dir=index_dir, **QexpConfig.INIT_ARGS, **QexpConfig.EXPANSION_ARGS, model=None) # evals = qxpeval.results - logger.info( - "-------------- Assessment is not available--------------") + logger.info("-------------- Assessment is not available--------------") """ results = mau.assess_model( model_name=model_id, @@ -462,8 +450,7 @@ def create_qexp( key=metric, value=results[metric]) """ - logger.info( - "-------------- Finished Assessment --------------") + logger.info("-------------- Finished Assessment --------------") else: logger.info("-------------- No Assessment Ran --------------") except Exception as e: @@ -500,8 +487,7 @@ def create_embedding( # GPU check use_gpu = gpu if use_gpu and not torch.cuda.is_available: - logger.info( - "GPU is not available. Setting `gpu` argument to False") + logger.info("GPU is not available. Setting `gpu` argument to False") use_gpu = False # Define model saving directories @@ -513,8 +499,7 @@ def create_embedding( # Define new index directory if not os.path.isdir(local_sent_index_dir): os.mkdir(local_sent_index_dir) - logger.info( - "-------------- Building Sentence Embeddings --------------") + logger.info("-------------- Building Sentence Embeddings --------------") logger.info("Loading Encoder Model...") # If existing index exists, copy content from reference index @@ -534,8 +519,7 @@ def create_embedding( ) logger.info("-------------- Indexing Documents--------------") start_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") - encoder.index_documents( - corpus_path=corpus, index_path=local_sent_index_dir) + encoder.index_documents(corpus_path=corpus, index_path=local_sent_index_dir) end_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info("-------------- Completed Indexing --------------") user = get_user(logger) @@ -602,8 +586,7 @@ def create_embedding( ) logger.info(f"Created tgz file and saved to {dst_path}") - logger.info( - "-------------- Finished Sentence Embedding--------------") + logger.info("-------------- Finished Sentence Embedding--------------") except Exception as e: logger.warning("Error with creating embedding") logger.error(e) @@ -613,7 +596,7 @@ def create_embedding( s3_path = os.path.join( S3_MODELS_PATH, f"sentence_index/{version}", - f"sentence_index_{model_id}.tar.gz" + f"sentence_index_{model_id}.tar.gz", ) bucket = S3Service.connect_to_bucket(S3Config.BUCKET_NAME, logger) S3Service.upload_file(bucket, dst_path, s3_path, logger) @@ -707,7 +690,8 @@ def create_topics( # Train topics status = status_updater.StatusUpdater( - process_key=processmanager.topics_creation, nsteps=6, + process_key=processmanager.topics_creation, + nsteps=6, ) topics_model = Topics(status=status) metadata = topics_model.train_from_files( @@ -727,11 +711,9 @@ def create_topics( # Upload to S3 if upload: S3_MODELS_PATH = "bronze/gamechanger/models" - s3_path = os.path.join( - S3_MODELS_PATH, f"topic_model/{version}") + s3_path = os.path.join(S3_MODELS_PATH, f"topic_model/{version}") logger.info(f"Topics uploading to {s3_path}") - self.upload(s3_path, tar_path, - "topic_model", model_id, version) + self.upload(s3_path, tar_path, "topic_model", model_id, version) evals = None # TODO: figure out how to evaluate this return metadata, evals @@ -744,8 +726,7 @@ def upload(self, s3_path, local_path, model_prefix, model_name, version): logger.info(f"Uploading files to {s3_path}") logger.info(f"\tUploading: {local_path}") # local_path = os.path.join(dst_path) - s3_path = os.path.join( - s3_path, f"{model_prefix}_" + model_name + ".tar.gz") + s3_path = os.path.join(s3_path, f"{model_prefix}_" + model_name + ".tar.gz") logger.info(f"s3_path {s3_path}") bucket = S3Service.connect_to_bucket(S3Config.BUCKET_NAME, logger) S3Service.upload_file( From efe671e139b2371c257da7d99057f5b7dbe8c5ac Mon Sep 17 00:00:00 2001 From: 604840 Date: Fri, 26 Aug 2022 09:26:42 -0400 Subject: [PATCH 04/15] updating function to await download corpus --- gamechangerml/api/fastapi/routers/startup.py | 7 +-- gamechangerml/api/utils/mlscheduler.py | 57 ++++++++++---------- 2 files changed, 34 insertions(+), 30 deletions(-) diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index 5dae9e53..df3446b5 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -106,9 +106,10 @@ async def check_corpus_health(): "s3_corpus_dir": "bronze/gamechanger/json", "logger": logger, } - corpus_thread = MlThread(check_corpus_diff, args) - corpus_thread.start() - processmanager.running_threads[corpus_thread.ident] = corpus_thread + await check_corpus_diff(**args) + # corpus_thread = MlThread(check_corpus_diff, args) + # corpus_thread.start() + # processmanager.running_threads[corpus_thread.ident] = corpus_thread def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> Tuple[float, bool, float]: diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index 51f16e97..a70960a1 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -27,7 +27,7 @@ ) -def check_corpus_diff( +async def check_corpus_diff( s3_corpus_dir: str, corpus_dir: str = "gamechangerml/corpus", bucket=None, @@ -54,32 +54,35 @@ def check_corpus_diff( logger.info(ratio) if ratio < 0.95: logger.info("Corpus is out of date - downloading data") - try: - logger.info("Attempting to download corpus from S3") - # grabs the s3 path to the corpus from the post in "corpus" - # then passes in where to dowload the corpus locally. - - args = { - "s3_corpus_dir": s3_corpus_dir, - "output_dir": CORPUS_DIR, - "logger": logger, - } - - logger.info(args) - corpus_thread = MlThread(download_corpus_s3, args) - corpus_thread.start() - processmanager.running_threads[corpus_thread.ident] = corpus_thread - processmanager.update_status( - processmanager.corpus_download, 0, 1, thread_id=corpus_thread.ident - ) - except Exception as e: - logger.exception("Could not get corpus from S3") - processmanager.update_status( - processmanager.corpus_download, - failed=True, - message=e, - thread_id=corpus_thread.ident, - ) + await download_corpus( + {"corpus": s3_corpus_dir, "output_dir": corpus_dir}, Response + ) + # try: + # logger.info("Attempting to download corpus from S3") + # # grabs the s3 path to the corpus from the post in "corpus" + # # then passes in where to dowload the corpus locally. + + # args = { + # "s3_corpus_dir": s3_corpus_dir, + # "output_dir": CORPUS_DIR, + # "logger": logger, + # } + + # logger.info(args) + # corpus_thread = MlThread(download_corpus_s3, args) + # corpus_thread.start() + # processmanager.running_threads[corpus_thread.ident] = corpus_thread + # processmanager.update_status( + # processmanager.corpus_download, 0, 1, thread_id=corpus_thread.ident + # ) + # except Exception as e: + # logger.exception("Could not get corpus from S3") + # processmanager.update_status( + # processmanager.corpus_download, + # failed=True, + # message=e, + # thread_id=corpus_thread.ident, + # ) except Exception: logger.exception("Failed to read corpus in S3") processmanager.update_status( From 6bf74e9bf4fb0d47547492147e990cd071f4421f Mon Sep 17 00:00:00 2001 From: 604840 Date: Mon, 29 Aug 2022 14:56:55 -0400 Subject: [PATCH 05/15] update thread working for updating corpus and building qexp --- gamechangerml/api/fastapi/routers/startup.py | 9 +- gamechangerml/api/fastapi/settings.py | 8 +- gamechangerml/api/utils/mlscheduler.py | 91 +++++++++++--------- gamechangerml/src/utilities/utils.py | 17 ++-- 4 files changed, 65 insertions(+), 60 deletions(-) diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index df3446b5..8f0f25dd 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -20,7 +20,7 @@ MEMORY_LOAD_LIMIT, ) from gamechangerml.api.fastapi.model_loader import ModelLoader -from gamechangerml.api.utils.mlscheduler import check_corpus_diff +from gamechangerml.api.utils.mlscheduler import corpus_update_event from gamechangerml.api.utils.threaddriver import MlThread from gamechangerml.api.utils import processmanager @@ -100,16 +100,13 @@ async def check_health(): @router.on_event("startup") @repeat_every(seconds=60 * 60, wait_first=False) -async def check_corpus_health(): +async def corpus_event_trigger(): logger.info("Checking corpus diff") args = { "s3_corpus_dir": "bronze/gamechanger/json", "logger": logger, } - await check_corpus_diff(**args) - # corpus_thread = MlThread(check_corpus_diff, args) - # corpus_thread.start() - # processmanager.running_threads[corpus_thread.ident] = corpus_thread + await corpus_update_event(**args) def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> Tuple[float, bool, float]: diff --git a/gamechangerml/api/fastapi/settings.py b/gamechangerml/api/fastapi/settings.py index d750a54a..820af751 100644 --- a/gamechangerml/api/fastapi/settings.py +++ b/gamechangerml/api/fastapi/settings.py @@ -24,7 +24,7 @@ CORPUS_DIR = CORPUS_PATH S3_CORPUS_PATH = os.environ.get("S3_CORPUS_PATH") - +CORPUS_EVENT_TRIGGER = 0.95 # Redis Cache Variables latest_intel_model_sent = CacheVariable("latest_intel_model_sent", True) latest_intel_model_sim = CacheVariable( @@ -32,8 +32,7 @@ ) latest_intel_model_encoder = CacheVariable("latest encoder model", True) latest_intel_model_trans = CacheVariable("latest_intel_model_trans") -latest_doc_compare_encoder = CacheVariable( - "latest doc compare encoder model", True) +latest_doc_compare_encoder = CacheVariable("latest doc compare encoder model", True) latest_doc_compare_sim = CacheVariable( "latest doc compare searcher (similarity model + sent index)", True ) @@ -72,7 +71,6 @@ # validate correct configurations logger.info(f"API TRANSFORMERS DIRECTORY is: {LOCAL_TRANSFORMERS_DIR.value}") logger.info(f"API INDEX PATH is: {SENT_INDEX_PATH.value}") -logger.info( - f"API DOC COMPARE INDEX PATH is: {DOC_COMPARE_SENT_INDEX_PATH.value}") +logger.info(f"API DOC COMPARE INDEX PATH is: {DOC_COMPARE_SENT_INDEX_PATH.value}") logger.info(f"API REDIS HOST is: {REDIS_HOST}") logger.info(f"API REDIS PORT is: {REDIS_PORT}") diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index a70960a1..e8e17a11 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -6,28 +6,31 @@ from threading import current_thread from os import makedirs from os.path import join, exists, basename - +import datetime from gamechangerml.src.services.s3_service import S3Service from gamechangerml.src.utilities import configure_logger from gamechangerml.configs import S3Config from gamechangerml.api.utils import processmanager -from gamechangerml.api.fastapi.routers.controls import download_corpus +from gamechangerml.api.fastapi.routers.controls import ( + download_corpus, + train_model, + train_qexp, +) from gamechangerml.api.utils.threaddriver import MlThread - -from gamechangerml.src.data_transfer import delete_local_corpus, download_corpus_s3 - import os from queue import Queue +from gamechangerml.src.data_transfer import download_corpus_s3 from fastapi import APIRouter, Response from gamechangerml.api.fastapi.settings import ( CORPUS_DIR, S3_CORPUS_PATH, + CORPUS_EVENT_TRIGGER, ) -async def check_corpus_diff( +async def corpus_update_event( s3_corpus_dir: str, corpus_dir: str = "gamechangerml/corpus", bucket=None, @@ -42,49 +45,55 @@ async def check_corpus_diff( process = processmanager.corpus_download try: - logger.info("Getting ratio") + logger.info("ML EVENT - Checking corpus staleness") s3_filter = bucket.objects.filter(Prefix=f"{s3_corpus_dir}/") total = len(list(s3_filter)) local_corpus_size = len(os.listdir(corpus_dir)) - logger.info(f"local corpus size {local_corpus_size}") - logger.info(f"total corpus size {total}") ratio = local_corpus_size / total - logger.info(ratio) - if ratio < 0.95: - logger.info("Corpus is out of date - downloading data") - await download_corpus( - {"corpus": s3_corpus_dir, "output_dir": corpus_dir}, Response + if ratio < CORPUS_EVENT_TRIGGER: + logger.info("ML EVENT - Corpus is stale - downloading data") + # trigger download corpus + logger.info("Attempting to download corpus from S3") + # grabs the s3 path to the corpus from the post in "corpus" + # then passes in where to dowload the corpus locally. + + thread_args = { + "args": { + "logger": logger, + "s3_args": { + "s3_corpus_dir": s3_corpus_dir, + "output_dir": CORPUS_DIR, + "logger": logger, + }, + "qexp_model_dict": { + "build_type": "qexp", + "upload": True, + "version": datetime.datetime.today().strftime("%Y%m%d"), + }, + } + } + + logger.info(thread_args) + corpus_thread = MlThread(run_update, thread_args) + corpus_thread.start() + processmanager.running_threads[corpus_thread.ident] = corpus_thread + processmanager.update_status( + processmanager.corpus_download, 0, 1, thread_id=corpus_thread.ident ) - # try: - # logger.info("Attempting to download corpus from S3") - # # grabs the s3 path to the corpus from the post in "corpus" - # # then passes in where to dowload the corpus locally. - - # args = { - # "s3_corpus_dir": s3_corpus_dir, - # "output_dir": CORPUS_DIR, - # "logger": logger, - # } - - # logger.info(args) - # corpus_thread = MlThread(download_corpus_s3, args) - # corpus_thread.start() - # processmanager.running_threads[corpus_thread.ident] = corpus_thread - # processmanager.update_status( - # processmanager.corpus_download, 0, 1, thread_id=corpus_thread.ident - # ) - # except Exception as e: - # logger.exception("Could not get corpus from S3") - # processmanager.update_status( - # processmanager.corpus_download, - # failed=True, - # message=e, - # thread_id=corpus_thread.ident, - # ) + except Exception: - logger.exception("Failed to read corpus in S3") + logger.exception("Failed to update corpus or train models") processmanager.update_status( process, failed=True, thread_id=current_thread().ident ) + + +def run_update(args): + logger = args["logger"] + logger.info("Attempting to download corpus from S3") + download_corpus_s3(**args["s3_args"]) + logger.info("Attempting to build Qexp") + model_dict = args["qexp_model_dict"] + train_qexp(model_dict) diff --git a/gamechangerml/src/utilities/utils.py b/gamechangerml/src/utilities/utils.py index c2bcd959..8a1dad89 100644 --- a/gamechangerml/src/utilities/utils.py +++ b/gamechangerml/src/utilities/utils.py @@ -11,6 +11,7 @@ logger = logging.getLogger("gamechanger") + def get_local_model_prefix(prefix: str, folder: str = MODEL_PATH): """get_local_model_prefix: gets all folders or models with the prefix, i.e. sent_index folder: PATH folder of models @@ -23,19 +24,23 @@ def get_local_model_prefix(prefix: str, folder: str = MODEL_PATH): if filename.startswith(prefix) and "tar" not in filename ] + def create_model_schema(model_dir, file_prefix): num = 0 while isdir(join(model_dir, file_prefix)): file_prefix = f"{file_prefix.split('_')[0]}_{num}" num += 1 - + dirpath = join(model_dir, file_prefix) makedirs(dirpath) logger.info(f"Created directory: {dirpath}.") + return dirpath -def get_transformers(model_path="transformers_v4/transformers.tar", overwrite=False, bucket=None): +def get_transformers( + model_path="transformers_v4/transformers.tar", overwrite=False, bucket=None +): if bucket is None: bucket = S3Service.connect_to_bucket(S3Config.BUCKET_NAME, logger) @@ -49,9 +54,7 @@ def get_transformers(model_path="transformers_v4/transformers.tar", overwrite=Fa return for obj in bucket.objects.filter(Prefix=model_path): print(obj) - bucket.download_file( - obj.key, join(models_path, obj.key.split("/")[-1]) - ) + bucket.download_file(obj.key, join(models_path, obj.key.split("/")[-1])) compressed = obj.key.split("/")[-1] cache_path = join(models_path, compressed) print("uncompressing: " + cache_path) @@ -83,9 +86,7 @@ def get_sentence_index(model_path="sent_index/", overwrite=False, bucket=None): return for obj in bucket.objects.filter(Prefix=model_path): print(obj) - bucket.download_file( - obj.key, join(models_path, obj.key.split("/")[-1]) - ) + bucket.download_file(obj.key, join(models_path, obj.key.split("/")[-1])) compressed = obj.key.split("/")[-1] cache_path = join(models_path, compressed) print("uncompressing: " + cache_path) From 569c5b17bdf5304f947468b5be995720035ff147 Mon Sep 17 00:00:00 2001 From: 604840 Date: Tue, 30 Aug 2022 17:17:17 -0400 Subject: [PATCH 06/15] adding env variable --- gamechangerml/api/fastapi/routers/startup.py | 14 ++++++++------ gamechangerml/api/fastapi/settings.py | 4 +++- gamechangerml/api/utils/mlscheduler.py | 8 +++----- gamechangerml/setup_env.sh | 9 +++++++-- 4 files changed, 21 insertions(+), 14 deletions(-) diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index 8f0f25dd..b9fdd28d 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -18,6 +18,7 @@ latest_doc_compare_sim, latest_doc_compare_encoder, MEMORY_LOAD_LIMIT, + CORPUS_EVENT_TRIGGER, ) from gamechangerml.api.fastapi.model_loader import ModelLoader from gamechangerml.api.utils.mlscheduler import corpus_update_event @@ -101,12 +102,13 @@ async def check_health(): @router.on_event("startup") @repeat_every(seconds=60 * 60, wait_first=False) async def corpus_event_trigger(): - logger.info("Checking corpus diff") - args = { - "s3_corpus_dir": "bronze/gamechanger/json", - "logger": logger, - } - await corpus_update_event(**args) + if CORPUS_EVENT_TRIGGER: + logger.info("Checking Corpus Staleness") + args = { + "s3_corpus_dir": "bronze/gamechanger/json", + "logger": logger, + } + await corpus_update_event(**args) def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> Tuple[float, bool, float]: diff --git a/gamechangerml/api/fastapi/settings.py b/gamechangerml/api/fastapi/settings.py index 820af751..acc3785a 100644 --- a/gamechangerml/api/fastapi/settings.py +++ b/gamechangerml/api/fastapi/settings.py @@ -24,7 +24,9 @@ CORPUS_DIR = CORPUS_PATH S3_CORPUS_PATH = os.environ.get("S3_CORPUS_PATH") -CORPUS_EVENT_TRIGGER = 0.95 +CORPUS_EVENT_TRIGGER_VAL = 0.95 +CORPUS_EVENT_TRIGGER = bool(os.environ.get("CORPUS_EVENT_TRIGGER", default=True)) + # Redis Cache Variables latest_intel_model_sent = CacheVariable("latest_intel_model_sent", True) latest_intel_model_sim = CacheVariable( diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index e8e17a11..48e05d89 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -26,7 +26,7 @@ from gamechangerml.api.fastapi.settings import ( CORPUS_DIR, S3_CORPUS_PATH, - CORPUS_EVENT_TRIGGER, + CORPUS_EVENT_TRIGGER_VAL, ) @@ -52,12 +52,10 @@ async def corpus_update_event( local_corpus_size = len(os.listdir(corpus_dir)) ratio = local_corpus_size / total - if ratio < CORPUS_EVENT_TRIGGER: + if ratio < CORPUS_EVENT_TRIGGER_VAL: logger.info("ML EVENT - Corpus is stale - downloading data") - # trigger download corpus + # trigger a thread to update corpus and build selected models logger.info("Attempting to download corpus from S3") - # grabs the s3 path to the corpus from the post in "corpus" - # then passes in where to dowload the corpus locally. thread_args = { "args": { diff --git a/gamechangerml/setup_env.sh b/gamechangerml/setup_env.sh index ce7f02d2..ed563b2a 100755 --- a/gamechangerml/setup_env.sh +++ b/gamechangerml/setup_env.sh @@ -39,7 +39,8 @@ function setup_prod() { export GC_WEB_USER="${GC_WEB_USER:-steve}" export GC_ENABLE_SSL="${GC_ENABLE_SSL:-true}" export ML_WEB_TOKEN="${ML_WEB_TOKEN:-}" - export MEMORY_LOAD_LIMIT="${MEMORY_LOAD_LIMIT:-80}" + export MEMORY_LOAD_LIMIT="${MEMORY_LOAD_LIMIT:-99}" + export CORPUS_EVENT_TRIGGER="${CORPUS_EVENT_TRIGGER:-true}" export DEV_ENV="PROD" } @@ -78,7 +79,9 @@ function setup_dev() { export ML_WEB_TOKEN="${ML_WEB_TOKEN:-eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJwZXJtcyI6WyJHYW1lY2hhbmdlciBBZG1pbiJdLCJjbiI6IjAwNyIsImNzcmYtdG9rZW4iOiI0ZWE1YzUwOGE2NTY2ZTc2MjQwNTQzZjhmZWIwNmZkNDU3Nzc3YmUzOTU0OWM0MDE2NDM2YWZkYTY1ZDIzMzBlIiwiaWF0IjoxNjQ0MzQyMjI0fQ.ezxT-36a25IMFJPea3re7sUwYZfm8ivZvDVAv_zti1W6DRkM2Hs7WwFuy-gR092m-p7Z7ohb7yM2AKVbJAn4CMtI3_1j0_VzzYb6yhQj7YIPg-Cax5Em9JCrIlCEx7r26o-zV1me0mIYMATJTMInKikeBvg2RJErvLwkgZNQVT8gQyR-JxM4mhjmylcel9-kt5FpJJuUKnzPI8BqVeG_eL6ktevA9odJRx56w9n2LivUaoQUCiXreLOLmSEwkkhIFnsyMcCCwkPpx4lMrkzjIr3B08_gRk5wIv4pV01OcSYR4QkXM7ZsNZZzRf-JtPHYn9SlT9DvwRVbYniYUCA7IM0OegFKSTt_-i7qvuKuYFHGDStfkijX2T6g_49oY1qfLsKldisccoOLfsaROpB1NiE9DBeM5OzAo-R98H_UiUFjsFVNvlunETbhuqX2yZFUjKxxerS_-1_DW8BmoD25Ofl188KM8gqUXo5lJs4bPTf41_N_V-57muZxdAq8kBazDKhaudAzskFNFF1B9dxwgxeE8wd5Gh_beCuCoP3K-9GwRVFfrdOCO19FDjqpLr0k94UfZzuflP5SDGXth2-AzZIslurPDL_1F4iadxq06GJggwryMxC7_Uc4_FQST53-gl9SXEFVSNdr6gsw318JNiyz8bxbBpIj7posqQeEaDg}" export ENABLE_DEBUGGER="${ENABLE_DEBUGGER:-true}" - export MEMORY_LOAD_LIMIT="${MEMORY_LOAD_LIMIT:-80}" + export MEMORY_LOAD_LIMIT="${MEMORY_LOAD_LIMIT:-99}" + export CORPUS_EVENT_TRIGGER="${CORPUS_EVENT_TRIGGER:-true}" + } @@ -100,6 +103,8 @@ function setup_devlocal() { export DEV_ENV="DEVLOCAL" export ENABLE_DEBUGGER="${ENABLE_DEBUGGER:-true}" + export MEMORY_LOAD_LIMIT="${MEMORY_LOAD_LIMIT:-99}" + export CORPUS_EVENT_TRIGGER="${CORPUS_EVENT_TRIGGER:-true}" } function setup_k8s_dev() { From 648854e4c4dfefdbf69b6dbadbb24987458f13ea Mon Sep 17 00:00:00 2001 From: 604840 Date: Tue, 30 Aug 2022 17:18:53 -0400 Subject: [PATCH 07/15] adding new process event --- gamechangerml/api/utils/mlscheduler.py | 2 +- gamechangerml/api/utils/processmanager.py | 23 ++++++++++++++++------- 2 files changed, 17 insertions(+), 8 deletions(-) diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index 48e05d89..f62b9064 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -78,7 +78,7 @@ async def corpus_update_event( corpus_thread.start() processmanager.running_threads[corpus_thread.ident] = corpus_thread processmanager.update_status( - processmanager.corpus_download, 0, 1, thread_id=corpus_thread.ident + processmanager.ml_change_event, 0, 1, thread_id=corpus_thread.ident ) except Exception: diff --git a/gamechangerml/api/utils/processmanager.py b/gamechangerml/api/utils/processmanager.py index d1676664..7e287fb8 100644 --- a/gamechangerml/api/utils/processmanager.py +++ b/gamechangerml/api/utils/processmanager.py @@ -1,7 +1,8 @@ import threading from datetime import datetime from gamechangerml.api.utils.redisdriver import CacheVariable -#from gamechangerml.api.fastapi.settings import logger # commenting out because of API calls failing for gamechanger-data + +# from gamechangerml.api.fastapi.settings import logger # commenting out because of API calls failing for gamechanger-data # Process Keys clear_corpus = "corpus: corpus_download" corpus_download = "corpus: corpus_download" @@ -14,6 +15,7 @@ reloading = "models: reloading_models " ltr_creation = "training: ltr_creation" topics_creation = "models: topics_creation" +ml_change_event = "training: corpus_download_training_models" running_threads = {} @@ -32,8 +34,7 @@ topics_creation: False, s3_file_download: False, s3_dependency: False, - loading_data: False - + loading_data: False, } except Exception as e: @@ -49,7 +50,15 @@ print(e) -def update_status(name, progress=0, total=100, message="", failed=False, thread_id="", completed_max=20): +def update_status( + name, + progress=0, + total=100, + message="", + failed=False, + thread_id="", + completed_max=20, +): thread_id = str(thread_id) try: @@ -72,7 +81,7 @@ def update_status(name, progress=0, total=100, message="", failed=False, thread_ if thread_id in running_threads: del running_threads[thread_id] if failed: - completed['date'] = 'Failed' + completed["date"] = "Failed" completed_list = COMPLETED_PROCESS.value completed_list.append(completed) @@ -87,8 +96,8 @@ def update_status(name, progress=0, total=100, message="", failed=False, thread_ status_dict = PROCESS_STATUS.value if thread_id not in status_dict: - status['process'] = name - status['category'] = name.split(':')[0] + status["process"] = name + status["category"] = name.split(":")[0] status_dict[thread_id] = status else: status_dict[thread_id].update(status) From 3241b7bcf3e7b8fe4e3fa7c6e0bdf942fcce9e22 Mon Sep 17 00:00:00 2001 From: 604840 Date: Tue, 30 Aug 2022 17:19:54 -0400 Subject: [PATCH 08/15] adding default --- gamechangerml/api/utils/processmanager.py | 1 + 1 file changed, 1 insertion(+) diff --git a/gamechangerml/api/utils/processmanager.py b/gamechangerml/api/utils/processmanager.py index 7e287fb8..9eccc11c 100644 --- a/gamechangerml/api/utils/processmanager.py +++ b/gamechangerml/api/utils/processmanager.py @@ -35,6 +35,7 @@ s3_file_download: False, s3_dependency: False, loading_data: False, + ml_change_event: False, } except Exception as e: From 6ccf57e02b5ace5e30d396da99f54d88e3fee9ea Mon Sep 17 00:00:00 2001 From: 604840 Date: Wed, 31 Aug 2022 09:52:55 -0400 Subject: [PATCH 09/15] updating version --- gamechangerml/api/fastapi/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gamechangerml/api/fastapi/version.py b/gamechangerml/api/fastapi/version.py index 545af92e..44759f55 100644 --- a/gamechangerml/api/fastapi/version.py +++ b/gamechangerml/api/fastapi/version.py @@ -1 +1 @@ -__version__ = "1.7" +__version__ = "1.9" From 09bdc839e4fb1da71c3109e289699f2dfc21608a Mon Sep 17 00:00:00 2001 From: 604840 Date: Wed, 31 Aug 2022 11:16:56 -0400 Subject: [PATCH 10/15] adding update process --- gamechangerml/api/fastapi/routers/startup.py | 3 ++- gamechangerml/api/utils/mlscheduler.py | 16 +++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/gamechangerml/api/fastapi/routers/startup.py b/gamechangerml/api/fastapi/routers/startup.py index b9fdd28d..dd2f486d 100644 --- a/gamechangerml/api/fastapi/routers/startup.py +++ b/gamechangerml/api/fastapi/routers/startup.py @@ -24,7 +24,7 @@ from gamechangerml.api.utils.mlscheduler import corpus_update_event from gamechangerml.api.utils.threaddriver import MlThread from gamechangerml.api.utils import processmanager - +from gamechangerml.api.fastapi.routers.controls import get_process_status import psutil router = APIRouter() @@ -109,6 +109,7 @@ async def corpus_event_trigger(): "logger": logger, } await corpus_update_event(**args) + await get_process_status() def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> Tuple[float, bool, float]: diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index f62b9064..f1379787 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -42,7 +42,7 @@ async def corpus_update_event( if bucket is None: bucket = S3Service.connect_to_bucket(S3Config.BUCKET_NAME, logger) - process = processmanager.corpus_download + process = processmanager.ml_change_event try: logger.info("ML EVENT - Checking corpus staleness") @@ -74,11 +74,11 @@ async def corpus_update_event( } logger.info(thread_args) - corpus_thread = MlThread(run_update, thread_args) - corpus_thread.start() - processmanager.running_threads[corpus_thread.ident] = corpus_thread + ml_event_thread = MlThread(run_update, thread_args) + ml_event_thread.start() + processmanager.running_threads[ml_event_thread.ident] = ml_event_thread processmanager.update_status( - processmanager.ml_change_event, 0, 1, thread_id=corpus_thread.ident + processmanager.ml_change_event, 0, 1, thread_id=ml_event_thread.ident ) except Exception: @@ -95,3 +95,9 @@ def run_update(args): logger.info("Attempting to build Qexp") model_dict = args["qexp_model_dict"] train_qexp(model_dict) + processmanager.update_status( + processmanager.ml_change_event, + 1, + 1, + thread_id=current_thread().ident, + ) From efc840d536fdf3f66b9763f16b07e7a08bc4d55c Mon Sep 17 00:00:00 2001 From: 604840 Date: Wed, 31 Aug 2022 14:09:10 -0400 Subject: [PATCH 11/15] adding sentence --- gamechangerml/api/utils/mlscheduler.py | 28 +++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index f1379787..e586dbb6 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -15,6 +15,7 @@ download_corpus, train_model, train_qexp, + train_sentence, ) from gamechangerml.api.utils.threaddriver import MlThread import os @@ -70,6 +71,11 @@ async def corpus_update_event( "upload": True, "version": datetime.datetime.today().strftime("%Y%m%d"), }, + "sent_model_dict": { + "build_type": "sentence", + "upload": True, + "version": datetime.datetime.today().strftime("%Y%m%d"), + }, } } @@ -78,7 +84,7 @@ async def corpus_update_event( ml_event_thread.start() processmanager.running_threads[ml_event_thread.ident] = ml_event_thread processmanager.update_status( - processmanager.ml_change_event, 0, 1, thread_id=ml_event_thread.ident + processmanager.ml_change_event, 0, 3, thread_id=ml_event_thread.ident ) except Exception: @@ -92,12 +98,28 @@ def run_update(args): logger = args["logger"] logger.info("Attempting to download corpus from S3") download_corpus_s3(**args["s3_args"]) + processmanager.update_status( + processmanager.ml_change_event, + 1, + 3, + thread_id=current_thread().ident, + ) logger.info("Attempting to build Qexp") model_dict = args["qexp_model_dict"] train_qexp(model_dict) processmanager.update_status( processmanager.ml_change_event, - 1, - 1, + 2, + 3, + thread_id=current_thread().ident, + ) + logger.info("Attempting to build Sentence Index") + model_dict = args["sent_model_dict"] + + train_sentence(model_dict) + processmanager.update_status( + processmanager.ml_change_event, + 3, + 3, thread_id=current_thread().ident, ) From 2b45b304fb8938d0fcec46e39b57111980b9a34b Mon Sep 17 00:00:00 2001 From: 604840 Date: Fri, 2 Sep 2022 10:26:39 -0400 Subject: [PATCH 12/15] updating corpus now based on date modified --- gamechangerml/api/fastapi/settings.py | 2 +- gamechangerml/api/utils/mlscheduler.py | 32 ++++++++++++++----- .../src/data_transfer/s3_download.py | 2 ++ 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/gamechangerml/api/fastapi/settings.py b/gamechangerml/api/fastapi/settings.py index acc3785a..9aec048c 100644 --- a/gamechangerml/api/fastapi/settings.py +++ b/gamechangerml/api/fastapi/settings.py @@ -24,7 +24,7 @@ CORPUS_DIR = CORPUS_PATH S3_CORPUS_PATH = os.environ.get("S3_CORPUS_PATH") -CORPUS_EVENT_TRIGGER_VAL = 0.95 +CORPUS_EVENT_TRIGGER_VAL = 0.5 CORPUS_EVENT_TRIGGER = bool(os.environ.get("CORPUS_EVENT_TRIGGER", default=True)) # Redis Cache Variables diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index e586dbb6..36f1f712 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -6,7 +6,7 @@ from threading import current_thread from os import makedirs from os.path import join, exists, basename -import datetime +from datetime import datetime, timezone from gamechangerml.src.services.s3_service import S3Service from gamechangerml.src.utilities import configure_logger from gamechangerml.configs import S3Config @@ -49,11 +49,27 @@ async def corpus_update_event( logger.info("ML EVENT - Checking corpus staleness") s3_filter = bucket.objects.filter(Prefix=f"{s3_corpus_dir}/") - total = len(list(s3_filter)) - local_corpus_size = len(os.listdir(corpus_dir)) - - ratio = local_corpus_size / total - if ratio < CORPUS_EVENT_TRIGGER_VAL: + last_mod_list = [] + if os.path.isdir(corpus_dir): + local_corpus_size = len(os.listdir(corpus_dir)) + if local_corpus_size > 0: + local_corpus_last_updated = datetime.fromtimestamp( + os.stat(corpus_dir).st_mtime + ).astimezone(timezone.utc) + for obj in s3_filter: + last_mod_list.append(obj.last_modified) + + last_mod_list = [ + dates + for dates in last_mod_list + if dates > local_corpus_last_updated + ] + ratio = len(last_mod_list) / local_corpus_size + else: + ratio = 1 + else: + ratio = 1 + if ratio > CORPUS_EVENT_TRIGGER_VAL: logger.info("ML EVENT - Corpus is stale - downloading data") # trigger a thread to update corpus and build selected models logger.info("Attempting to download corpus from S3") @@ -69,12 +85,12 @@ async def corpus_update_event( "qexp_model_dict": { "build_type": "qexp", "upload": True, - "version": datetime.datetime.today().strftime("%Y%m%d"), + "version": datetime.today().strftime("%Y%m%d"), }, "sent_model_dict": { "build_type": "sentence", "upload": True, - "version": datetime.datetime.today().strftime("%Y%m%d"), + "version": datetime.today().strftime("%Y%m%d"), }, } } diff --git a/gamechangerml/src/data_transfer/s3_download.py b/gamechangerml/src/data_transfer/s3_download.py index b9df6fd0..f40f55d5 100644 --- a/gamechangerml/src/data_transfer/s3_download.py +++ b/gamechangerml/src/data_transfer/s3_download.py @@ -51,6 +51,8 @@ def download_corpus_s3( success = delete_local_corpus(output_dir, logger) if not success: return [] + if not os.path.isdir(output_dir): + os.mkdir(output_dir) corpus = [] process = processmanager.corpus_download From 6cf47ed9441e3ae941ed851032a759d68cf7bbbc Mon Sep 17 00:00:00 2001 From: 604840 Date: Fri, 2 Sep 2022 10:35:44 -0400 Subject: [PATCH 13/15] fixing args for sentence --- gamechangerml/api/utils/mlscheduler.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index 36f1f712..639d7798 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -7,6 +7,7 @@ from os import makedirs from os.path import join, exists, basename from datetime import datetime, timezone +from tkinter import TRUE from gamechangerml.src.services.s3_service import S3Service from gamechangerml.src.utilities import configure_logger from gamechangerml.configs import S3Config @@ -28,6 +29,7 @@ CORPUS_DIR, S3_CORPUS_PATH, CORPUS_EVENT_TRIGGER_VAL, + latest_intel_model_encoder, ) @@ -91,6 +93,8 @@ async def corpus_update_event( "build_type": "sentence", "upload": True, "version": datetime.today().strftime("%Y%m%d"), + "encoder_model": latest_intel_model_encoder, + "gpu": True, }, } } From 2774f5de38ca46e3e3ab01d4e7c4b8991ff88682 Mon Sep 17 00:00:00 2001 From: 604840 Date: Fri, 2 Sep 2022 10:47:07 -0400 Subject: [PATCH 14/15] fixing sentence building --- gamechangerml/api/utils/mlscheduler.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index 639d7798..97064bab 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -7,14 +7,11 @@ from os import makedirs from os.path import join, exists, basename from datetime import datetime, timezone -from tkinter import TRUE from gamechangerml.src.services.s3_service import S3Service from gamechangerml.src.utilities import configure_logger from gamechangerml.configs import S3Config from gamechangerml.api.utils import processmanager from gamechangerml.api.fastapi.routers.controls import ( - download_corpus, - train_model, train_qexp, train_sentence, ) @@ -93,7 +90,7 @@ async def corpus_update_event( "build_type": "sentence", "upload": True, "version": datetime.today().strftime("%Y%m%d"), - "encoder_model": latest_intel_model_encoder, + "encoder_model": str(latest_intel_model_encoder.value), "gpu": True, }, } From 6dd7fa9d957d1b50dc9334ede26832f43be8a06e Mon Sep 17 00:00:00 2001 From: 604840 Date: Fri, 2 Sep 2022 10:54:03 -0400 Subject: [PATCH 15/15] fixing sentence building actually --- gamechangerml/api/utils/mlscheduler.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/gamechangerml/api/utils/mlscheduler.py b/gamechangerml/api/utils/mlscheduler.py index 97064bab..4d73b2fa 100644 --- a/gamechangerml/api/utils/mlscheduler.py +++ b/gamechangerml/api/utils/mlscheduler.py @@ -90,7 +90,9 @@ async def corpus_update_event( "build_type": "sentence", "upload": True, "version": datetime.today().strftime("%Y%m%d"), - "encoder_model": str(latest_intel_model_encoder.value), + "encoder_model": str(latest_intel_model_encoder.value).split( + "/" + )[-1], "gpu": True, }, }