Skip to content

Commit

Permalink
Merge branch 'dev' into Add_Hierarchy_JSON_File
Browse files Browse the repository at this point in the history
  • Loading branch information
Mishoe Austin committed Sep 7, 2022
2 parents 4bbbf70 + 0992af2 commit 8956e3d
Show file tree
Hide file tree
Showing 14 changed files with 333 additions and 97 deletions.
24 changes: 21 additions & 3 deletions gamechangerml/api/fastapi/routers/controls.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shutil
import threading
import pandas as pd
import redis

from datetime import datetime
from gamechangerml import DATA_PATH
Expand Down Expand Up @@ -37,6 +38,8 @@
)
from gamechangerml.src.data_transfer import download_corpus_s3
from gamechangerml.api.utils.threaddriver import MlThread
from gamechangerml.api.utils.redisdriver import RedisPool

from gamechangerml.train.pipeline import Pipeline
from gamechangerml.api.utils import processmanager
from gamechangerml.api.fastapi.model_loader import ModelLoader
Expand Down Expand Up @@ -85,6 +88,22 @@ async def get_process_status():
"completed_process": processmanager.COMPLETED_PROCESS.value,
}

@router.post("/clearCache")
async def get_process_status(body: dict, response: Response):
_connection = redis.Redis(connection_pool=RedisPool().getPool())

if body['clear']:
for key in body['clear']:
_connection.delete(f'search: {key}')
else:
for key in _connection.scan_iter("search:*"):
# delete the key
_connection.delete(key)

@router.get("/getCache")
async def get_process_status():
_connection = redis.Redis(connection_pool=RedisPool().getPool())
return [key.split('search: ')[1] for key in list(_connection.scan_iter("search:*"))]

@router.get("/getDataList")
def get_downloaded_data_list():
Expand Down Expand Up @@ -753,10 +772,9 @@ def reload_thread(model_dict):

@router.post("/downloadCorpus", status_code=200)
async def download_corpus(corpus_dict: dict, response: Response):
"""load_latest_models - endpoint for updating the transformer model
"""download_corpus - endpoint for downloading corpus
Args:
model_dict: dict; {"sentence": "bert...",
"qexp": "bert...", "transformer": "bert..."}
corpus_dict: dict; {"corpus": "bronze/gamechanger/json"}
Response: Response class; for status codes(apart of fastapi do not need to pass param)
Returns:
"""
Expand Down
2 changes: 1 addition & 1 deletion gamechangerml/api/fastapi/routers/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async def trans_sentence_infer(
results = {}
try:
query_text = body["text"]
cache = CacheVariable(query_text, True)
cache = CacheVariable(f'search: {query_text}', True)
cached_value = cache.get_value()
if cached_value:
logger.info("Searched was found in cache")
Expand Down
22 changes: 21 additions & 1 deletion gamechangerml/api/fastapi/routers/startup.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
from fastapi import APIRouter
from fastapi_utils.tasks import repeat_every
import os
from typing import Tuple

from gamechangerml.api.fastapi.settings import (
DOC_COMPARE_SENT_INDEX_PATH,
logger,
Expand All @@ -16,8 +18,13 @@
latest_doc_compare_sim,
latest_doc_compare_encoder,
MEMORY_LOAD_LIMIT,
CORPUS_EVENT_TRIGGER,
)
from gamechangerml.api.fastapi.model_loader import ModelLoader
from gamechangerml.api.utils.mlscheduler import corpus_update_event
from gamechangerml.api.utils.threaddriver import MlThread
from gamechangerml.api.utils import processmanager
from gamechangerml.api.fastapi.routers.controls import get_process_status
import psutil

router = APIRouter()
Expand Down Expand Up @@ -92,7 +99,20 @@ async def check_health():
logger.info(f"RAM % used: {ram_used}")


def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> (float, bool, float):
@router.on_event("startup")
@repeat_every(seconds=60 * 60, wait_first=False)
async def corpus_event_trigger():
if CORPUS_EVENT_TRIGGER:
logger.info("Checking Corpus Staleness")
args = {
"s3_corpus_dir": "bronze/gamechanger/json",
"logger": logger,
}
await corpus_update_event(**args)
await get_process_status()


def get_hw_usage(threshold: int = MEMORY_LOAD_LIMIT) -> Tuple[float, bool, float]:
surpassed = False
ram_used = psutil.virtual_memory()[2]
if threshold:
Expand Down
8 changes: 4 additions & 4 deletions gamechangerml/api/fastapi/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

CORPUS_DIR = CORPUS_PATH
S3_CORPUS_PATH = os.environ.get("S3_CORPUS_PATH")
CORPUS_EVENT_TRIGGER_VAL = 0.5
CORPUS_EVENT_TRIGGER = bool(os.environ.get("CORPUS_EVENT_TRIGGER", default=True))

# Redis Cache Variables
latest_intel_model_sent = CacheVariable("latest_intel_model_sent", True)
Expand All @@ -32,8 +34,7 @@
)
latest_intel_model_encoder = CacheVariable("latest encoder model", True)
latest_intel_model_trans = CacheVariable("latest_intel_model_trans")
latest_doc_compare_encoder = CacheVariable(
"latest doc compare encoder model", True)
latest_doc_compare_encoder = CacheVariable("latest doc compare encoder model", True)
latest_doc_compare_sim = CacheVariable(
"latest doc compare searcher (similarity model + sent index)", True
)
Expand Down Expand Up @@ -72,7 +73,6 @@
# validate correct configurations
logger.info(f"API TRANSFORMERS DIRECTORY is: {LOCAL_TRANSFORMERS_DIR.value}")
logger.info(f"API INDEX PATH is: {SENT_INDEX_PATH.value}")
logger.info(
f"API DOC COMPARE INDEX PATH is: {DOC_COMPARE_SENT_INDEX_PATH.value}")
logger.info(f"API DOC COMPARE INDEX PATH is: {DOC_COMPARE_SENT_INDEX_PATH.value}")
logger.info(f"API REDIS HOST is: {REDIS_HOST}")
logger.info(f"API REDIS PORT is: {REDIS_PORT}")
2 changes: 1 addition & 1 deletion gamechangerml/api/fastapi/version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "1.7"
__version__ = "1.9"
14 changes: 4 additions & 10 deletions gamechangerml/api/utils/logger.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,21 @@
import logging
from logging import handlers
import sys
from gamechangerml.src.utilities import configure_logger

# set loggers
logger = logging.getLogger()
logger = configure_logger()
glogger = logging.getLogger("gunicorn.error")
logger.setLevel(logging.DEBUG)

log_formatter = logging.Formatter(
"%(asctime)s [%(levelname)s][PID:%(process)d]: %(message)s"
)
try:
ch = logging.StreamHandler(sys.stdout)
ch.setFormatter(log_formatter)
logger.addHandler(ch)
glogger.addHandler(ch)
# glogger.addHandler(ch)
log_file_path = "gamechangerml/api/logs/gc_ml_logs.txt"
fh = logging.handlers.RotatingFileHandler(
log_file_path, maxBytes=2000000, backupCount=1, mode="a"
)
logger.info(f"ML API is logging to {log_file_path}")

fh.setFormatter(log_formatter)
# fh.setFormatter(log_formatter)
logger.addHandler(fh)
glogger.addHandler(fh)
except Exception as e:
Expand Down
144 changes: 144 additions & 0 deletions gamechangerml/api/utils/mlscheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
"""Utility functions for scheduling ml builds based on events
Also see gamechangerml.src.services.s3_service.py
"""

from threading import current_thread
from os import makedirs
from os.path import join, exists, basename
from datetime import datetime, timezone
from gamechangerml.src.services.s3_service import S3Service
from gamechangerml.src.utilities import configure_logger
from gamechangerml.configs import S3Config
from gamechangerml.api.utils import processmanager
from gamechangerml.api.fastapi.routers.controls import (
train_qexp,
train_sentence,
)
from gamechangerml.api.utils.threaddriver import MlThread
import os
from queue import Queue
from gamechangerml.src.data_transfer import download_corpus_s3

from fastapi import APIRouter, Response

from gamechangerml.api.fastapi.settings import (
CORPUS_DIR,
S3_CORPUS_PATH,
CORPUS_EVENT_TRIGGER_VAL,
latest_intel_model_encoder,
)


async def corpus_update_event(
s3_corpus_dir: str,
corpus_dir: str = "gamechangerml/corpus",
bucket=None,
logger=None,
) -> bool:
if logger is None:
logger = configure_logger()

if bucket is None:
bucket = S3Service.connect_to_bucket(S3Config.BUCKET_NAME, logger)

process = processmanager.ml_change_event

try:
logger.info("ML EVENT - Checking corpus staleness")

s3_filter = bucket.objects.filter(Prefix=f"{s3_corpus_dir}/")
last_mod_list = []
if os.path.isdir(corpus_dir):
local_corpus_size = len(os.listdir(corpus_dir))
if local_corpus_size > 0:
local_corpus_last_updated = datetime.fromtimestamp(
os.stat(corpus_dir).st_mtime
).astimezone(timezone.utc)
for obj in s3_filter:
last_mod_list.append(obj.last_modified)

last_mod_list = [
dates
for dates in last_mod_list
if dates > local_corpus_last_updated
]
ratio = len(last_mod_list) / local_corpus_size
else:
ratio = 1
else:
ratio = 1
if ratio > CORPUS_EVENT_TRIGGER_VAL:
logger.info("ML EVENT - Corpus is stale - downloading data")
# trigger a thread to update corpus and build selected models
logger.info("Attempting to download corpus from S3")

thread_args = {
"args": {
"logger": logger,
"s3_args": {
"s3_corpus_dir": s3_corpus_dir,
"output_dir": CORPUS_DIR,
"logger": logger,
},
"qexp_model_dict": {
"build_type": "qexp",
"upload": True,
"version": datetime.today().strftime("%Y%m%d"),
},
"sent_model_dict": {
"build_type": "sentence",
"upload": True,
"version": datetime.today().strftime("%Y%m%d"),
"encoder_model": str(latest_intel_model_encoder.value).split(
"/"
)[-1],
"gpu": True,
},
}
}

logger.info(thread_args)
ml_event_thread = MlThread(run_update, thread_args)
ml_event_thread.start()
processmanager.running_threads[ml_event_thread.ident] = ml_event_thread
processmanager.update_status(
processmanager.ml_change_event, 0, 3, thread_id=ml_event_thread.ident
)

except Exception:
logger.exception("Failed to update corpus or train models")
processmanager.update_status(
process, failed=True, thread_id=current_thread().ident
)


def run_update(args):
logger = args["logger"]
logger.info("Attempting to download corpus from S3")
download_corpus_s3(**args["s3_args"])
processmanager.update_status(
processmanager.ml_change_event,
1,
3,
thread_id=current_thread().ident,
)
logger.info("Attempting to build Qexp")
model_dict = args["qexp_model_dict"]
train_qexp(model_dict)
processmanager.update_status(
processmanager.ml_change_event,
2,
3,
thread_id=current_thread().ident,
)
logger.info("Attempting to build Sentence Index")
model_dict = args["sent_model_dict"]

train_sentence(model_dict)
processmanager.update_status(
processmanager.ml_change_event,
3,
3,
thread_id=current_thread().ident,
)
24 changes: 17 additions & 7 deletions gamechangerml/api/utils/processmanager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import threading
from datetime import datetime
from gamechangerml.api.utils.redisdriver import CacheVariable
#from gamechangerml.api.fastapi.settings import logger # commenting out because of API calls failing for gamechanger-data

# from gamechangerml.api.fastapi.settings import logger # commenting out because of API calls failing for gamechanger-data
# Process Keys
clear_corpus = "corpus: corpus_download"
corpus_download = "corpus: corpus_download"
Expand All @@ -14,6 +15,7 @@
reloading = "models: reloading_models "
ltr_creation = "training: ltr_creation"
topics_creation = "models: topics_creation"
ml_change_event = "training: corpus_download_training_models"

running_threads = {}

Expand All @@ -32,8 +34,8 @@
topics_creation: False,
s3_file_download: False,
s3_dependency: False,
loading_data: False

loading_data: False,
ml_change_event: False,
}

except Exception as e:
Expand All @@ -49,7 +51,15 @@
print(e)


def update_status(name, progress=0, total=100, message="", failed=False, thread_id="", completed_max=20):
def update_status(
name,
progress=0,
total=100,
message="",
failed=False,
thread_id="",
completed_max=20,
):

thread_id = str(thread_id)
try:
Expand All @@ -72,7 +82,7 @@ def update_status(name, progress=0, total=100, message="", failed=False, thread_
if thread_id in running_threads:
del running_threads[thread_id]
if failed:
completed['date'] = 'Failed'
completed["date"] = "Failed"

completed_list = COMPLETED_PROCESS.value
completed_list.append(completed)
Expand All @@ -87,8 +97,8 @@ def update_status(name, progress=0, total=100, message="", failed=False, thread_
status_dict = PROCESS_STATUS.value

if thread_id not in status_dict:
status['process'] = name
status['category'] = name.split(':')[0]
status["process"] = name
status["category"] = name.split(":")[0]
status_dict[thread_id] = status
else:
status_dict[thread_id].update(status)
Expand Down
Loading

0 comments on commit 8956e3d

Please sign in to comment.