diff --git a/.github/workflows/run-checks.yml b/.github/workflows/checks.yml similarity index 66% rename from .github/workflows/run-checks.yml rename to .github/workflows/checks.yml index 9f7d2410..6d294de1 100644 --- a/.github/workflows/run-checks.yml +++ b/.github/workflows/checks.yml @@ -1,30 +1,26 @@ name: Linter, formatter, and docs checks -on: [pull_request] -permissions: - contents: read +on: pull_request jobs: - build: - runs-on: ubuntu-latest - if: "!contains(github.event.head_commit.message, 'CI Bot')" + checks: + runs-on: ubuntu-22.04 + if: "!contains(github.event.head_commit.message, 'CI Bot')" steps: - uses: actions/checkout@v4 - with: - fetch-depth: 1 - - name: Set up Python 3.10 + - name: Set up Python 3.12 uses: actions/setup-python@v5 with: - python-version: "3.10" + python-version: "3.12" cache: "pip" - name: Install package and dependencies run: | - python -m pip install --upgrade pip - python -m pip install ruff - python -m pip install .[docs] + pip install --upgrade pip + pip install ruff + pip install .[docs] - name: Run linter and formatter checks using ruff run: make checks diff --git a/.github/workflows/run-llm-tests.yml b/.github/workflows/run-llm-tests.yml new file mode 100644 index 00000000..b607e6bf --- /dev/null +++ b/.github/workflows/run-llm-tests.yml @@ -0,0 +1,52 @@ +name: LLM Tests +on: [pull_request] + +jobs: + + build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.10", "3.11", "3.12" ] + env: + MONGO_ENABLED: true + LMDB_ENABLED: false + timeout-minutes: 60 + if: "!contains(github.event.head_commit.message, 'CI Bot')" + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 1 + + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + cache: "pip" + + - name: Show OS Info + run: '[[ "$OSTYPE" == "linux-gnu"* ]] && { echo "OS Type: Linux"; (command -v lsb_release &> /dev/null && lsb_release -a) || cat /etc/os-release; uname -r; } || [[ "$OSTYPE" == "darwin"* ]] && { echo "OS Type: macOS"; sw_vers; uname -r; } || echo "Unsupported OS type: $OSTYPE"' + + - name: Start docker compose with redis + run: make services-mongo + + - name: Upgrade pip + run: | + python -m pip install --upgrade pip + python --version + + - name: Test LLM + run: bash .github/workflows/run_examples.sh examples true llm_complex/llm_test_runner.py + + - name: Shut down docker compose + run: make services-stop-mongo + + - name: Clean up + run: | + make clean + find /home/runner/runners/ -type f -name "*.log" -exec sh -c 'echo {}; >"{}"' \; || true + docker image prune -a -f + + - name: List large files + run: find . -type f -exec du -h {} + | sort -h diff --git a/.github/workflows/run_examples.sh b/.github/workflows/run_examples.sh index 66b95fc5..0e6cddf8 100644 --- a/.github/workflows/run_examples.sh +++ b/.github/workflows/run_examples.sh @@ -25,7 +25,7 @@ fi # Function to run tests with common steps run_test() { - test_path="${EXAMPLES_DIR}/${1}_example.py" + test_path="${EXAMPLES_DIR}/${1}" test_type="$1" with_mongo="$2" echo "Test type=${test_type}" @@ -39,6 +39,8 @@ run_test() { pip install .[mongo] > /dev/null 2>&1 fi + + # The following block is only needed to install special dependencies. if [[ "$test_type" =~ "mlflow" ]]; then echo "Installing mlflow" pip install .[mlflow] > /dev/null 2>&1 @@ -53,6 +55,7 @@ run_test() { pip install .[ml_dev] > /dev/null 2>&1 elif [[ "$test_type" =~ "llm_complex" ]]; then echo "Installing ml_dev dependencies" + pip install .[dask] > /dev/null 2>&1 pip install .[ml_dev] echo "Defining python path for llm_complex..." export PYTHONPATH=$PYTHONPATH:${EXAMPLES_DIR}/llm_complex @@ -62,7 +65,6 @@ run_test() { echo "Running $test_path ..." python "$test_path" | tee output.log echo "Ok, ran $test_path." - # Check for errors in the output if grep -iq "error" output.log; then echo "Test $test_path failed! See output.log for details." exit 1 @@ -70,7 +72,6 @@ run_test() { echo "Great, no errors to run $test_path." - # Clean up the log file rm output.log } @@ -81,7 +82,7 @@ echo "Using examples directory: $EXAMPLES_DIR" echo "With Mongo? ${WITH_MONGO}" # Define the test cases -default_tests=("instrumented_simple" "instrumented_loop" "dask" "mlflow" "tensorboard" "single_layer_perceptron" "llm_complex/llm_main") +default_tests=("instrumented_simple_example.py" "instrumented_loop_example.py" "distributed_consumer_example.py" "dask_example.py" "mlflow_example.py" "tensorboard_example.py" "single_layer_perceptron_example.py" "llm_complex/llm_main_example.py") # Use the third argument if provided, otherwise use default tests if [[ -n "$3" ]]; then diff --git a/.gitignore b/.gitignore index 8d6c88e2..5c8d44a9 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ **/*build* **/*egg* **/*pycache* -**/*dist* +#**/*dist* **/*mlflow.db* **/*mnist* **/*tensorboard_events* @@ -23,3 +23,4 @@ deployment/data **/*output_data* examples/llm_complex/input_data tmp_tests/ +nohup.out diff --git a/Makefile b/Makefile index b9d622ee..7683c9f3 100644 --- a/Makefile +++ b/Makefile @@ -41,11 +41,11 @@ clean: find . -type f -name "*.pth" -exec rm -f {} \; || true find . -type f -name "mlflow.db" -exec rm -f {} \; || true find . -type d -name "mlruns" -exec rm -rf {} \; 2>/dev/null || true - find . -type d -name "mlruns" -exec rm -rf {} \; 2>/dev/null || true find . -type d -name "__pycache__" -exec rm -rf {} \; 2>/dev/null || true find . -type d -name "*tfevents*" -exec rm -rf {} \; 2>/dev/null || true find . -type d -name "*output_data*" -exec rm -rf {} \; 2>/dev/null || true - # sphinx-build -M clean docs docs/_build This needs to be fixed. + find . -type f -name "*nohup*" -exec rm -rf {} \; 2>/dev/null || true + sphinx-build -M clean docs docs/_build > /dev/null 2>&1 || true # Build the HTML documentation using Sphinx .PHONY: docs @@ -96,7 +96,3 @@ tests: .PHONY: tests-notebooks tests-notebooks: pytest --nbmake "notebooks/" --nbmake-timeout=600 --ignore=notebooks/dask_from_CLI.ipynb - -.PHONY: tests-all -tests-all: - pytest diff --git a/examples/distributed_consumer_example.py b/examples/distributed_consumer_example.py new file mode 100644 index 00000000..1f321b73 --- /dev/null +++ b/examples/distributed_consumer_example.py @@ -0,0 +1,76 @@ +import os +import subprocess +import uuid +from time import sleep +from flowcept import Flowcept, FlowceptTask + +def execute_cmd(command: str) -> int: + """ + Executes a command using nohup in the background and returns the process ID (PID). + + Parameters + ---------- + command : str + The command to be executed. + + Returns + ------- + int + The PID of the background process. + """ + try: + # Append nohup and redirect outputs to /dev/null for background execution + nohup_command = f"nohup {command} > /dev/null 2>&1 & echo $!" + # Execute the command in a shell and capture the PID + print(f"Executing: {nohup_command}") + process = subprocess.run(nohup_command, shell=True, check=True, executable='/bin/bash', text=True, capture_output=True) + pid = int(process.stdout.strip()) # Get the PID from the output + print(f"Started process with PID: {pid}") + return pid + except subprocess.CalledProcessError as e: + print(f"Error executing command: {command}\n{e}") + return -1 + + +def kill_process(pid: int) -> None: + """ + Kills a process by its PID. + + Parameters + ---------- + pid : int + The PID of the process to be killed. + """ + try: + os.kill(pid, 9) # Send SIGKILL to the process + print(f"Process {pid} killed successfully.") + except ProcessLookupError: + print(f"No process found with PID: {pid}.") + except PermissionError: + print(f"Permission denied to kill PID: {pid}.") + + +def simple_flowcept_task(workflow_id): + + with Flowcept(start_persistence=False, workflow_id=workflow_id, bundle_exec_id=workflow_id): + with FlowceptTask(used={"a": 1}) as t: + t.end(generated={"b": 2}) + + +if __name__ == "__main__": + + workflow_id = str(uuid.uuid4()) + print(workflow_id) + + pid = execute_cmd(f"python -c 'from flowcept import Flowcept; Flowcept.start_consumption_services(\"{workflow_id}\")'") + sleep(1) + + simple_flowcept_task(workflow_id) + + sleep(15) # Give enough time for the consumer services to do their thing + + kill_process(pid) + + tasks = Flowcept.db.query({"workflow_id": workflow_id}) + assert len(tasks) == 1 + print(tasks) diff --git a/examples/llm_complex/llm_main_example.py b/examples/llm_complex/llm_main_example.py index 866ef59f..f0916f7d 100644 --- a/examples/llm_complex/llm_main_example.py +++ b/examples/llm_complex/llm_main_example.py @@ -18,8 +18,6 @@ from flowcept.flowceptor.adapters.dask.dask_plugins import FlowceptDaskSchedulerAdapter, \ FlowceptDaskWorkerAdapter, register_dask_workflow -TORCH_CAPTURE = INSTRUMENTATION.get("torch") - def _interpolate_values(start, end, step): return [start + i * step for i in range((end - start) // step + 1)] @@ -92,8 +90,8 @@ def search_workflow(ntokens, dataset_ref, train_data_path, val_data_path, test_d {**c, "ntokens": ntokens, "dataset_ref": dataset_ref, "train_data_path":train_data_path, - "val_data_path": val_data_path, - "test_data_path": test_data_path, + "val_data_path": val_data_path, + "test_data_path": test_data_path, "workflow_id": search_wf_id, "campaign_id": campaign_id} for c in configs @@ -112,7 +110,7 @@ def search_workflow(ntokens, dataset_ref, train_data_path, val_data_path, test_d return search_wf_id -def run_asserts_and_exports(campaign_id): +def run_asserts_and_exports(campaign_id, model_search_wf_id): from flowcept.commons.vocabulary import Status print("Now running all asserts...") """ @@ -137,6 +135,11 @@ def run_asserts_and_exports(campaign_id): Module Layer Forward Test Workflow . Parent module forward tasks Module Layer Forward Test Workflow . Children modules forward tasks """ + + if INSTRUMENTATION.get("torch").get("epoch_loop") is None or INSTRUMENTATION.get("torch").get("batch_loop") is None: + raise Exception("We can't assert this now.") + + at_every = INSTRUMENTATION.get("torch").get("capture_epochs_at_every", 1) campaign_workflows = Flowcept.db.query({"campaign_id": campaign_id}, collection="workflows") workflows_data = [] assert len(campaign_workflows) == 4 - 1 # dataprep + model_search + 2 subworkflows for the model_seearch @@ -166,10 +169,6 @@ def run_asserts_and_exports(campaign_id): n_tasks_expected += 1 assert t["status"] == Status.FINISHED.value - # epochs_loop = Flowcept.db.query( - # {"parent_task_id": t["task_id"], "activity_id": FlowceptEpochLoop.ACTIVITY_ID})[0] - # assert epochs_loop["status"] == Status.FINISHED.value - # n_tasks_expected += 1 epoch_iteration_tasks = Flowcept.db.query( {"parent_task_id": t["task_id"], "activity_id": "epochs_loop_iteration"}) assert len(epoch_iteration_tasks) == t["used"]["epochs"] @@ -180,21 +179,15 @@ def run_asserts_and_exports(campaign_id): epoch_iteration_ids.add(epoch_iteration_task["task_id"]) assert epoch_iteration_task["status"] == Status.FINISHED.value - # train_batches_loop = Flowcept.db.query( - # {"parent_task_id": epoch_iteration_task["task_id"], "activity_id": "train_batch"})[0] - # n_tasks_expected += 1 train_batch_iteration_tasks = Flowcept.db.query( {"parent_task_id": epoch_iteration_task["task_id"], "activity_id": "train_batch_iteration"}) + assert len(train_batch_iteration_tasks) > 0 # TODO: == number of train_batches - # eval_batches_loop = Flowcept.db.query( - # {"parent_task_id": epoch_iteration_task["task_id"], "activity_id": "eval_batch"})[ - # 0] - # n_tasks_expected += 1 eval_batch_iteration_tasks = Flowcept.db.query( {"parent_task_id": epoch_iteration_task["task_id"], "activity_id": "eval_batch_iteration"}) - assert len(eval_batch_iteration_tasks) > 0 # TODO: == number of train_batches + assert len(eval_batch_iteration_tasks) > 0 # TODO: == number of eval_batches batch_iteration_lst = [train_batch_iteration_tasks, eval_batch_iteration_tasks] for batch_iterations in batch_iteration_lst: @@ -202,13 +195,14 @@ def run_asserts_and_exports(campaign_id): for batch_iteration in batch_iterations: n_tasks_expected += 1 - if "parent" in TORCH_CAPTURE.get("what"): + if "parent" in INSTRUMENTATION.get("torch").get("what"): parent_forwards = Flowcept.db.query( {"workflow_id": parent_module_wf_id, "activity_id": "TransformerModel", "parent_task_id": batch_iteration["task_id"]}) if len(parent_forwards) == 0: continue + assert len(parent_forwards) == 1 parent_forward = parent_forwards[0] @@ -219,11 +213,16 @@ def run_asserts_and_exports(campaign_id): assert parent_forward[ "parent_task_id"] == batch_iteration["task_id"] - if "children" in TORCH_CAPTURE.get("what"): + if "children" in INSTRUMENTATION.get("torch").get("what"): children_forwards = Flowcept.db.query( {"parent_task_id": parent_forward["task_id"]}) - if "telemetry" in TORCH_CAPTURE.get("children_mode") or epoch_iteration_task["used"]["i"] == 0: + # We only have children_forward if: + # epoch == 1 or + # telemetry and epoch % at every == 0 + curr_epoch = epoch_iteration_task["used"]["i"] + if (curr_epoch == 0) or \ + ("telemetry" in INSTRUMENTATION.get("torch").get("children_mode") and curr_epoch % at_every == 0): assert len(children_forwards) == 4 # there are four children submodules # TODO get dynamically for child_forward in children_forwards: n_tasks_expected += 1 @@ -236,7 +235,7 @@ def run_asserts_and_exports(campaign_id): return n_workflows_expected, n_tasks_expected -def save_files(campaign_id, model_search_wf_id, output_dir="output_data"): +def save_files(mongo_dao, campaign_id, model_search_wf_id, output_dir="output_data"): os.makedirs(output_dir, exist_ok=True) best_task = Flowcept.db.query({"workflow_id": model_search_wf_id, "activity_id": "model_train"}, limit=1, sort=[("generated.test_loss", Flowcept.db.ASCENDING)])[0] @@ -255,6 +254,10 @@ def save_files(campaign_id, model_search_wf_id, output_dir="output_data"): doc = Flowcept.db.load_torch_model(loaded_model, best_model_obj_id) torch.save(loaded_model.state_dict(), f"{output_dir}/wf_{model_search_wf_id}_transformer_wikitext2.pth") + + print("Deleting best model from the database.") + mongo_dao.delete_object_keys("object_id", [doc["object_id"]]) + workflows_file = f"{output_dir}/workflows_{uuid.uuid4()}.json" print(f"workflows_file = '{workflows_file}'") Flowcept.db.dump_to_file(filter={"campaign_id": campaign_id}, collection="workflows", @@ -271,11 +274,10 @@ def save_files(campaign_id, model_search_wf_id, output_dir="output_data"): return workflows_file, tasks_file -def main(): +def run_campaign(): _campaign_id = str(uuid.uuid4()) print(f"Campaign id={_campaign_id}") - input_data_dir = "input_data" tokenizer_type = "basic_english" subset_size = 10 max_runs = 1 @@ -300,13 +302,13 @@ def main(): batch_size=exp_param_settings["batch_size"][0], eval_batch_size=exp_param_settings["eval_batch_size"][0], subset_size=subset_size) + _search_wf_id = search_workflow(dataprep_generated["ntokens"], dataprep_generated["dataset_ref"], dataprep_generated["train_data_path"], dataprep_generated["val_data_path"], dataprep_generated["test_data_path"], exp_param_settings, max_runs, campaign_id=_campaign_id) return _campaign_id, _dataprep_wf_id, _search_wf_id, epochs, max_runs, dataprep_generated["train_n_batches"], dataprep_generated["val_n_batches"] -def asserts_on_saved_dfs(workflows_file, tasks_file, n_workflows_expected, n_tasks_expected, epoch_iterations, max_runs, n_batches_train, n_batches_eval, n_modules): - +def asserts_on_saved_dfs(mongo_dao, workflows_file, tasks_file, n_workflows_expected, n_tasks_expected, epoch_iterations, max_runs, n_batches_train, n_batches_eval, n_modules): workflows_df = pd.read_json(workflows_file) # Assert workflows dump assert len(workflows_df) == n_workflows_expected @@ -316,7 +318,7 @@ def asserts_on_saved_dfs(workflows_file, tasks_file, n_workflows_expected, n_tas # TODO: save #n_batches for train, test, val individually search_tasks = max_runs - at_every = INSTRUMENTATION.get("torch", {}).get("capture_epochs_at_every", 1) + at_every = INSTRUMENTATION.get("torch").get("capture_epochs_at_every", 1) batch_iteration_tasks = epoch_iterations * (n_batches_train + n_batches_eval) non_module_tasks = search_tasks + epoch_iterations + batch_iteration_tasks @@ -329,14 +331,14 @@ def asserts_on_saved_dfs(workflows_file, tasks_file, n_workflows_expected, n_tas number_of_captured_epochs = epoch_iterations / at_every - if "telemetry" in TORCH_CAPTURE.get("children_mode"): + if "telemetry" in INSTRUMENTATION.get("torch").get("children_mode"): expected_child_tasks = search_tasks * epoch_iterations * ( (n_batches_train * n_modules) + (n_batches_eval * n_modules)) expected_child_tasks = expected_child_tasks/at_every expected_child_tasks_per_epoch = expected_child_tasks / number_of_captured_epochs with_used = 1 * expected_child_tasks_per_epoch without_used = (number_of_captured_epochs - 1) * expected_child_tasks_per_epoch - elif "tensor_inspection" in TORCH_CAPTURE.get("children_mode"): + elif "tensor_inspection" in INSTRUMENTATION.get("torch").get("children_mode"): expected_child_tasks = search_tasks * 1 * ( (n_batches_train * n_modules) + (n_batches_eval * n_modules)) expected_child_tasks_per_epoch = expected_child_tasks @@ -345,14 +347,69 @@ def asserts_on_saved_dfs(workflows_file, tasks_file, n_workflows_expected, n_tas else: raise NotImplementedError("Needs to implement for lightweight") - assert len(tasks_df[tasks_df.subtype == 'child_forward']) == expected_child_tasks - assert non_module_tasks + parent_module_tasks + expected_child_tasks == len(tasks_df) - # Testing if only the first epoch got the inspection assert len(tasks_df[(tasks_df.subtype == 'parent_forward') & (tasks_df.used.str.contains('tensor'))]) == n_batches_train + n_batches_eval - # Testing if capturing at every at_every epochs - assert len(tasks_df[(tasks_df.subtype == 'child_forward') & (tasks_df.used == 'NaN')]) == without_used - assert len(tasks_df[(tasks_df.subtype == 'child_forward') & (tasks_df.used != 'NaN')]) == with_used + + if "children" in INSTRUMENTATION.get("torch").get("what"): + assert len(tasks_df[tasks_df.subtype == 'child_forward']) == expected_child_tasks + assert non_module_tasks + parent_module_tasks + expected_child_tasks == len(tasks_df) + # Testing if capturing at every at_every epochs + assert len(tasks_df[(tasks_df.subtype == 'child_forward') & ( + tasks_df.used == 'NaN')]) == without_used + assert len( + tasks_df[(tasks_df.subtype == 'child_forward') & (tasks_df.used != 'NaN')]) == with_used + + task_ids = list(tasks_df["task_id"].unique()) + workflow_ids = list(workflows_df["workflow_id"].unique()) + print("Deleting generated data in MongoDB") + mongo_dao.delete_task_keys("task_id", task_ids) + mongo_dao.delete_workflow_keys("workflow_id", workflow_ids) + + +def verify_number_docs_in_db(mongo_dao, n_tasks=None, n_wfs=None, n_objects=None): + _n_tasks = mongo_dao.count_tasks() + _n_wfs = mongo_dao.count_workflows() + _n_objects = mongo_dao.count_objects() + + if n_tasks: + if n_tasks != _n_tasks: + raise Exception("Number of tasks now is different than when we started this campaign.") + else: + print("Good, #tasks are equal to the beginning!") + + if n_wfs: + if n_wfs != _n_wfs: + raise Exception("Number of workflows now is different than when we started this campaign.") + else: + print("Good, #workflows are equal to the beginning!") + + if n_objects: + if n_objects != _n_objects: + raise Exception("Number of object now is different than when we started this campaign.") + else: + print("Good, #objects are equal to the beginning!") + + return _n_tasks, _n_wfs, _n_objects + + + +def main(): + + print("TORCH SETTINGS: " + str(INSTRUMENTATION.get("torch"))) + + from flowcept.commons.daos.docdb_dao.mongodb_dao import MongoDBDAO + mongo_dao = MongoDBDAO(create_indices=False) + + n_tasks, n_wfs, n_objects = verify_number_docs_in_db(mongo_dao) + + campaign_id, dataprep_wf_id, model_search_wf_id, epochs, max_runs, n_batches_train, n_batches_eval = run_campaign() + + n_workflows_expected, n_tasks_expected = run_asserts_and_exports(campaign_id, model_search_wf_id) + workflows_file, tasks_file = save_files(mongo_dao, campaign_id, model_search_wf_id) + asserts_on_saved_dfs(mongo_dao, workflows_file, tasks_file, n_workflows_expected, n_tasks_expected, + epochs, max_runs, n_batches_train, n_batches_eval, n_modules=4) + verify_number_docs_in_db(mongo_dao, n_tasks, n_wfs, n_objects) + print("Alright! Congrats.") if __name__ == "__main__": @@ -361,9 +418,6 @@ def asserts_on_saved_dfs(workflows_file, tasks_file, n_workflows_expected, n_tas print("This test is only available if Mongo is enabled.") sys.exit(0) - campaign_id, dataprep_wf_id, model_search_wf_id, epochs, max_runs, n_batches_train, n_batches_eval = main() - n_workflows_expected, n_tasks_expected = run_asserts_and_exports(campaign_id) - workflows_file, tasks_file = save_files(campaign_id, model_search_wf_id) - asserts_on_saved_dfs(workflows_file, tasks_file, n_workflows_expected, n_tasks_expected, epochs, max_runs, n_batches_train, n_batches_eval, n_modules=4) - print("Alright! Congrats.") + main() + sys.exit(0) diff --git a/examples/llm_complex/llm_model.py b/examples/llm_complex/llm_model.py index 572f084d..b640fd7f 100644 --- a/examples/llm_complex/llm_model.py +++ b/examples/llm_complex/llm_model.py @@ -197,6 +197,7 @@ def model_train( criterion = nn.CrossEntropyLoss() optimizer = optim.Adam(model.parameters(), lr=lr) best_val_loss = float("inf") # Initialize the best validation loss to infinity + best_obj_id = None # Initialize with unknown best model # Iterate through the epochs epochs_loop = FlowceptEpochLoop(range(1, epochs + 1), parent_task_id=main_task_id, model=model) t0 = time() @@ -214,8 +215,9 @@ def model_train( # If the validation loss has improved, save the model's state if val_loss < best_val_loss: best_val_loss = val_loss - best_obj_id = Flowcept.db.save_torch_model( + best_obj_id = Flowcept.db.save_or_update_torch_model( model, + object_id=best_obj_id, task_id=epochs_loop.get_current_iteration_id(), workflow_id=workflow_id, custom_metadata={"best_val_loss": best_val_loss} diff --git a/examples/llm_complex/llm_test_runner.py b/examples/llm_complex/llm_test_runner.py new file mode 100644 index 00000000..16e5b173 --- /dev/null +++ b/examples/llm_complex/llm_test_runner.py @@ -0,0 +1,117 @@ +import os +import subprocess +import sys +import traceback +from itertools import product + +import yaml + +from flowcept.configs import SETTINGS_PATH + +LOG_FILE = "test_log.log" + + +def write_log(msg: str): + with open(LOG_FILE, "a") as log_file: + print(msg) + log_file.write(msg+"\n") + log_file.flush() + os.fsync(log_file.fileno()) + + +def write_exception(msg: str, exception: Exception): + print(msg) + write_log(msg) + with open(LOG_FILE, "a") as log_file: + traceback.print_exception(type(exception), exception, exception.__traceback__, file=log_file) + log_file.flush() + os.fsync(log_file.fileno()) + + +def run_test(config, max_runs=50): + + update_yaml_file(config) + + for i in range(0, max_runs): + success = run_process() + if not success: + return False + write_log(f"Done with {i}") + + return True + + +def one(): + config = {'what': 'parent_and_children', 'children_mode': 'tensor_inspection', + 'epoch_loop': 'default', 'batch_loop': 'default', 'capture_epochs_at_every': 1} + run_test(max_runs=1, config=config) + + +def all(): + configs = { + "what": ["parent_and_children", "parent_only"], + "children_mode": ["tensor_inspection", "telemetry_and_tensor_inspection"], + "epoch_loop": ["default", "lightweight"], + "batch_loop": ["default", "lightweight"], + "capture_epochs_at_every": [1, 2] + } + + keys = configs.keys() + values = configs.values() + + combinations = [dict(zip(keys, combination)) for combination in product(*values)] + + for i, config in enumerate(combinations, start=1): + write_log(f"\n\nStarting for combination {i}/{len(combinations)}: {config}") + + try: + success = run_test(max_runs=3, config=config) + if not success: + write_log(str(i) + "--> " + str(config)) + sys.exit(1) + except Exception as e: + write_exception(f"\n\n!!!!!!##### ERROR for combination {config}\n", e) + raise Exception(e) + + +def update_yaml_file(torch_config: dict) -> None: + with open(SETTINGS_PATH, 'r') as file: + data = yaml.safe_load(file) or {} # Load YAML or initialize as empty dict + + # Apply updates + data["instrumentation"]["torch"] = torch_config + + # Save updated YAML back to file + with open(SETTINGS_PATH, 'w') as file: + yaml.safe_dump(data, file, default_flow_style=False) + print("Updated settings file") + + +def run_process(): + current_script_path = os.path.abspath(__file__) + parent_directory = os.path.dirname(current_script_path) + llm_main_script = os.path.join(parent_directory, "llm_main_example.py") + command = ["python", llm_main_script] + process = subprocess.Popen( + command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + ) + + # Print the output line by line in real-time + for line in process.stdout: + print(line, end="") # Print to console + + process.wait() + + if process.returncode != 0: + print(f"\nScript failed with return code {process.returncode}") + return False + else: + print("\nScript finished successfully!") + return True + + +if __name__ == "__main__": + all() diff --git a/pyproject.toml b/pyproject.toml index 330b3f07..02880a7a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -11,7 +11,7 @@ dependencies = [ "msgpack", "omegaconf", "pandas", - "psutil", + "psutil>=6.1.1", "py-cpuinfo", "redis", "requests", @@ -64,13 +64,13 @@ mlflow = ["mlflow-skinny", "SQLAlchemy", "alembic", "watchdog"] nvidia = ["nvidia-ml-py"] tensorboard = ["tensorboard", "tensorflow", "tbparse"] dev = [ + "flowcept[docs]", "jupyterlab", "nbmake", "pika", "pytest", "ruff", "pyyaml", - "psutil>=6.1.1" ] # Torch and some other ml-specific libs, only used for dev purposes, require the following specific versions. ml_dev = [ diff --git a/src/flowcept/commons/autoflush_buffer.py b/src/flowcept/commons/autoflush_buffer.py index 432c9c26..e0e6587e 100644 --- a/src/flowcept/commons/autoflush_buffer.py +++ b/src/flowcept/commons/autoflush_buffer.py @@ -11,12 +11,10 @@ def __init__( self, max_size, flush_interval, - logger, flush_function: Callable, flush_function_args=[], flush_function_kwargs={}, ): - self.logger = logger self._max_size = max_size self._flush_interval = flush_interval self._buffers = [[], []] diff --git a/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py b/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py index 872d84e5..a9bc4ad0 100644 --- a/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py +++ b/src/flowcept/commons/daos/docdb_dao/docdb_dao_base.py @@ -338,7 +338,7 @@ def dump_tasks_to_file_recursive(self, workflow_id, output_file="tasks.parquet", raise NotImplementedError @abstractmethod - def save_object( + def save_or_update_object( self, object, object_id, diff --git a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py index d69f42cb..4cc7daa0 100644 --- a/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/lmdb_dao.py @@ -318,7 +318,7 @@ def dump_to_file(self, collection, filter, output_file, export_format, should_zi """Dump data to file.""" raise NotImplementedError - def save_object( + def save_or_update_object( self, object, object_id, diff --git a/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py b/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py index 9bbe86c1..821ace6d 100644 --- a/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py +++ b/src/flowcept/commons/daos/docdb_dao/mongodb_dao.py @@ -322,6 +322,66 @@ def delete_task_keys(self, key_name, keys_list: List[Any]) -> bool: self.logger.exception(e) return False + def delete_workflow_keys(self, key_name, keys_list: List[Any]) -> bool: + """ + Delete workflow documents based on a specific key and value from the workflows collection. + + Parameters + ---------- + key_name : str + The name of the key to be matched for deletion. + keys_list : list of any + The list of values for the specified key to delete the matching documents. + + Returns + ------- + bool + True if the deletion was successful, False otherwise. + + Raises + ------ + Exception + If an error occurs during the deletion operation. + """ + if type(keys_list) is not list: + keys_list = [keys_list] + try: + self._wfs_collection.delete_many({key_name: {"$in": keys_list}}) + return True + except Exception as e: + self.logger.exception(e) + return False + + def delete_object_keys(self, key_name, keys_list: List[Any]) -> bool: + """ + Delete workflow documents based on a specific key and value from the objects collection. + + Parameters + ---------- + key_name : str + The name of the key to be matched for deletion. + keys_list : list of any + The list of values for the specified key to delete the matching documents. + + Returns + ------- + bool + True if the deletion was successful, False otherwise. + + Raises + ------ + Exception + If an error occurs during the deletion operation. + """ + if type(keys_list) is not list: + keys_list = [keys_list] + try: + self._obj_collection.delete_many({key_name: {"$in": keys_list}}) + return True + except Exception as e: + self.logger.exception(e) + return False + def delete_tasks_with_filter(self, filter) -> bool: """ Delete task documents that match the specified filter. @@ -356,6 +416,22 @@ def count_tasks(self) -> int: self.logger.exception(e) return -1 + def count_workflows(self) -> int: + """Count number of docs in tasks collection.""" + try: + return self._wfs_collection.count_documents({}) + except Exception as e: + self.logger.exception(e) + return -1 + + def count_objects(self) -> int: + """Count number of docs in tasks collection.""" + try: + return self._obj_collection.count_documents({}) + except Exception as e: + self.logger.exception(e) + return -1 + def insert_or_update_workflow(self, workflow_obj: WorkflowObject) -> bool: """Insert or update workflow.""" _dict = workflow_obj.to_dict().copy() @@ -495,7 +571,7 @@ def liveness_test(self) -> bool: self.logger.exception(e) return False - def save_object( + def save_or_update_object( self, object, object_id=None, @@ -534,7 +610,11 @@ def save_object( if custom_metadata is not None: obj_doc["custom_metadata"] = custom_metadata - self._obj_collection.insert_one(obj_doc) + update_query = { + "$set": obj_doc, + } + + self._obj_collection.update_one({"object_id": object_id}, update_query, upsert=True) return object_id diff --git a/src/flowcept/commons/daos/mq_dao/mq_dao_base.py b/src/flowcept/commons/daos/mq_dao/mq_dao_base.py index 98b57a28..cadf5bfa 100644 --- a/src/flowcept/commons/daos/mq_dao/mq_dao_base.py +++ b/src/flowcept/commons/daos/mq_dao/mq_dao_base.py @@ -89,7 +89,7 @@ def _bulk_publish(self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps): def bulk_publish(self, buffer): """Publish it.""" - self.logger.info(f"Going to flush {len(buffer)} to MQ...") + # self.logger.info(f"Going to flush {len(buffer)} to MQ...") if MQ_CHUNK_SIZE > 1: for chunk in chunked(buffer, MQ_CHUNK_SIZE): self._bulk_publish(chunk) @@ -99,9 +99,9 @@ def bulk_publish(self, buffer): def register_time_based_thread_init(self, interceptor_instance_id: str, exec_bundle_id=None): """Register the time.""" set_name = MQDao._get_set_name(exec_bundle_id) - self.logger.info( - f"Register start of time_based MQ flush thread {set_name}.{interceptor_instance_id}" - ) + # self.logger.info( + # f"Register start of time_based MQ flush thread {set_name}.{interceptor_instance_id}" + # ) self._keyvalue_dao.add_key_into_set(set_name, interceptor_instance_id) def register_time_based_thread_end(self, interceptor_instance_id: str, exec_bundle_id=None): @@ -123,12 +123,11 @@ def all_time_based_threads_ended(self, exec_bundle_id=None): def init_buffer(self, interceptor_instance_id: str, exec_bundle_id=None): """Create the buffer.""" if flowcept.configs.DB_FLUSH_MODE == "online": - msg = "Starting MQ time-based flushing! bundle: " - self.logger.info(msg + f"{exec_bundle_id}; interceptor id: {interceptor_instance_id}") + # msg = "Starting MQ time-based flushing! bundle: " + # self.logger.debug(msg+f"{exec_bundle_id}; interceptor id: {interceptor_instance_id}") self.buffer = AutoflushBuffer( max_size=MQ_BUFFER_SIZE, flush_interval=MQ_INSERTION_BUFFER_TIME, - logger=self.logger, flush_function=self.bulk_publish, ) self.register_time_based_thread_init(interceptor_instance_id, exec_bundle_id) @@ -150,10 +149,10 @@ def _close_buffer(self): def stop(self, interceptor_instance_id: str, bundle_exec_id: int = None): """Stop it.""" msg0 = "MQ publisher received stop signal! bundle: " - self.logger.info(msg0 + f"{bundle_exec_id}; interceptor id: {interceptor_instance_id}") + self.logger.debug(msg0 + f"{bundle_exec_id}; interceptor id: {interceptor_instance_id}") self._close_buffer() msg = "Flushed MQ for last time! Send stop msg. bundle: " - self.logger.info(msg + f"{bundle_exec_id}; interceptor id: {interceptor_instance_id}") + self.logger.debug(msg + f"{bundle_exec_id}; interceptor id: {interceptor_instance_id}") self._send_mq_dao_time_thread_stop(interceptor_instance_id, bundle_exec_id) def _send_mq_dao_time_thread_stop(self, interceptor_instance_id, exec_bundle_id=None): @@ -165,7 +164,7 @@ def _send_mq_dao_time_thread_stop(self, interceptor_instance_id, exec_bundle_id= "interceptor_instance_id": interceptor_instance_id, "exec_bundle_id": exec_bundle_id, } - self.logger.info("Control msg sent: " + str(msg)) + # self.logger.info("Control msg sent: " + str(msg)) self.send_message(msg) def send_document_inserter_stop(self): diff --git a/src/flowcept/commons/daos/mq_dao/mq_dao_kafka.py b/src/flowcept/commons/daos/mq_dao/mq_dao_kafka.py index c64b1db6..b2e180db 100644 --- a/src/flowcept/commons/daos/mq_dao/mq_dao_kafka.py +++ b/src/flowcept/commons/daos/mq_dao/mq_dao_kafka.py @@ -72,9 +72,7 @@ def send_message(self, message: dict, channel=MQ_CHANNEL, serializer=msgpack.dum def _bulk_publish(self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps): for message in buffer: try: - self.logger.debug( - f"Going to send Message:" f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" - ) + self.logger.debug(f"Going to send Message:\n\t[BEGIN_MSG]{message}\n[END_MSG]\t") self._producer.produce(channel, key=channel, value=serializer(message)) except Exception as e: self.logger.exception(e) diff --git a/src/flowcept/commons/daos/mq_dao/mq_dao_redis.py b/src/flowcept/commons/daos/mq_dao/mq_dao_redis.py index 0f0fb798..d31d791c 100644 --- a/src/flowcept/commons/daos/mq_dao/mq_dao_redis.py +++ b/src/flowcept/commons/daos/mq_dao/mq_dao_redis.py @@ -50,9 +50,6 @@ def _bulk_publish(self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps): pipe = self._producer.pipeline() for message in buffer: try: - self.logger.debug( - f"Going to send Message:" f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" - ) pipe.publish(MQ_CHANNEL, serializer(message)) except Exception as e: self.logger.exception(e) @@ -65,7 +62,7 @@ def _bulk_publish(self, buffer, channel=MQ_CHANNEL, serializer=msgpack.dumps): t0 = time() try: pipe.execute() - self.logger.info(f"Flushed {len(buffer)} msgs to MQ!") + # self.logger.debug(f"Flushed {len(buffer)} msgs to MQ!") except Exception as e: self.logger.exception(e) perf_log("mq_pipe_execute", t0) diff --git a/src/flowcept/commons/flowcept_logger.py b/src/flowcept/commons/flowcept_logger.py index dbf0567f..ecbc97f0 100644 --- a/src/flowcept/commons/flowcept_logger.py +++ b/src/flowcept/commons/flowcept_logger.py @@ -37,7 +37,7 @@ def _build_logger(cls): logger.addHandler(stream_handler) if file_level <= logging.CRITICAL: - file_handler = logging.FileHandler(LOG_FILE_PATH, mode="a+") + file_handler = logging.FileHandler(LOG_FILE_PATH, delay=True, mode="a+") file_handler.setLevel(file_level) file_format = logging.Formatter(f"[%(asctime)s]{_BASE_FORMAT}") file_handler.setFormatter(file_format) diff --git a/src/flowcept/commons/settings_factory.py b/src/flowcept/commons/settings_factory.py index 9791d5b8..6801db9c 100644 --- a/src/flowcept/commons/settings_factory.py +++ b/src/flowcept/commons/settings_factory.py @@ -41,7 +41,7 @@ def get_settings(adapter_key: str) -> BaseSettings: settings_dict = settings[Vocabulary.Settings.ADAPTERS][adapter_key] if not settings_dict: raise Exception( - f"You must specify the adapter <<{adapter_key}>> in" f" the settings YAML file." + f"You must specify the adapter <<{adapter_key}>> in the settings YAML file." ) settings_dict["key"] = adapter_key kind = settings_dict[Vocabulary.Settings.KIND] diff --git a/src/flowcept/configs.py b/src/flowcept/configs.py index 36bb96a9..4033b477 100644 --- a/src/flowcept/configs.py +++ b/src/flowcept/configs.py @@ -36,10 +36,11 @@ SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", f"{_SETTINGS_DIR}/settings.yaml") if not os.path.exists(SETTINGS_PATH): - SETTINGS_PATH = None from importlib import resources - with resources.files("resources").joinpath("sample_settings.yaml").open("r") as f: + SETTINGS_PATH = str(resources.files("resources").joinpath("sample_settings.yaml")) + + with open(SETTINGS_PATH) as f: settings = OmegaConf.load(f) else: settings = OmegaConf.load(SETTINGS_PATH) diff --git a/src/flowcept/flowcept_api/db_api.py b/src/flowcept/flowcept_api/db_api.py index 5ac0c09c..179021c2 100644 --- a/src/flowcept/flowcept_api/db_api.py +++ b/src/flowcept/flowcept_api/db_api.py @@ -197,7 +197,7 @@ def dump_to_file( self.logger.exception(e) return False - def save_object( + def save_or_update_object( self, object, object_id=None, @@ -209,7 +209,7 @@ def save_object( pickle=False, ): """Save the object.""" - return DBAPI._dao().save_object( + return DBAPI._dao().save_or_update_object( object, object_id, task_id, @@ -239,9 +239,10 @@ def query( filter, projection, limit, sort, aggregation, remove_json_unserializables, collection ) - def save_torch_model( + def save_or_update_torch_model( self, model, + object_id=None, task_id=None, workflow_id=None, custom_metadata: dict = {}, @@ -270,8 +271,9 @@ def save_torch_model( **custom_metadata, "class": model.__class__.__name__, } - obj_id = self.save_object( + obj_id = self.save_or_update_object( object=binary_data, + object_id=object_id, type="ml_model", task_id=task_id, workflow_id=workflow_id, diff --git a/src/flowcept/flowcept_api/flowcept_controller.py b/src/flowcept/flowcept_api/flowcept_controller.py index bb873b5d..d785b1fb 100644 --- a/src/flowcept/flowcept_api/flowcept_controller.py +++ b/src/flowcept/flowcept_api/flowcept_controller.py @@ -179,7 +179,30 @@ def __exit__(self, exc_type, exc_val, exc_tb): @staticmethod def services_alive() -> bool: - """Get alive services.""" + """ + Checks the liveness of the MQ (Message Queue) and, if enabled, the MongoDB service. + + Returns + ------- + bool + True if all services (MQ and optionally MongoDB) are alive, False otherwise. + + Notes + ----- + - The method tests the liveness of the MQ service using `MQDao`. + - If `MONGO_ENABLED` is True, it also checks the liveness of the MongoDB service + using `MongoDBDAO`. + - Logs errors if any service is not ready, and logs success when both services are + operational. + + Examples + -------- + >>> is_alive = services_alive() + >>> if is_alive: + ... print("All services are running.") + ... else: + ... print("One or more services are not ready.") + """ logger = FlowceptLogger() if not MQDao.build().liveness_test(): logger.error("MQ Not Ready!") @@ -192,3 +215,46 @@ def services_alive() -> bool: return False logger.info("MQ and DocDB are alive!") return True + + @staticmethod + def start_consumption_services( + bundle_exec_id: str = None, check_safe_stops: bool = False, consumers: List = None + ): + """ + Starts the document consumption services for processing. + + Parameters + ---------- + bundle_exec_id : str, optional + The execution ID of the bundle being processed. Defaults to None. + check_safe_stops : bool, optional + Whether to enable safe stop checks for the service. Defaults to False. + consumers : List, optional + A list of consumer types to be started. Currently, only one type of consumer + is supported. Defaults to None. + + Raises + ------ + NotImplementedError + If multiple consumer types are provided in the `consumers` list. + + Notes + ----- + - The method initializes the `DocumentInserter` service, which processes documents + based on the provided parameters. + - The `threaded` parameter for `DocumentInserter.start` is set to `False`. + + Examples + -------- + >>> start_consumption_services(bundle_exec_id="12345", check_safe_stops=True) + """ + if consumers is not None: + raise NotImplementedError("We currently only have one type of consumer.") + from flowcept.flowceptor.consumers.document_inserter import DocumentInserter + + logger = FlowceptLogger() + doc_inserter = DocumentInserter( + check_safe_stops=check_safe_stops, bundle_exec_id=bundle_exec_id + ) + logger.debug("Starting doc inserter service.") + doc_inserter.start(threaded=False) diff --git a/src/flowcept/flowceptor/adapters/base_interceptor.py b/src/flowcept/flowceptor/adapters/base_interceptor.py index 6ad541e6..01fa0a10 100644 --- a/src/flowcept/flowceptor/adapters/base_interceptor.py +++ b/src/flowcept/flowceptor/adapters/base_interceptor.py @@ -1,7 +1,6 @@ """Base Interceptor module.""" from abc import abstractmethod -from time import time from typing import Dict, List from uuid import uuid4 @@ -31,7 +30,7 @@ class BaseInterceptor(object): def __init__(self, plugin_key=None, kind=None): self.logger = FlowceptLogger() - self.logger.debug(f"Starting Interceptor{id(self)} at {time()}") + # self.logger.debug(f"Starting Interceptor{id(self)} at {time()}") if plugin_key is not None: # TODO :base-interceptor-refactor: :code-reorg: :usability: self.settings = get_settings(plugin_key) diff --git a/src/flowcept/flowceptor/adapters/tensorboard/tensorboard_interceptor.py b/src/flowcept/flowceptor/adapters/tensorboard/tensorboard_interceptor.py index ec6e5fc3..84980fb8 100644 --- a/src/flowcept/flowceptor/adapters/tensorboard/tensorboard_interceptor.py +++ b/src/flowcept/flowceptor/adapters/tensorboard/tensorboard_interceptor.py @@ -110,11 +110,10 @@ def observe(self): event_handler = InterceptionEventHandler(self, self.__class__.callback) while not os.path.isdir(self.settings.file_path): self.logger.debug( - f"I can't watch the file {self.settings.file_path}," f" as it does not exist." + f"I can't watch the file {self.settings.file_path}, as it does not exist." ) self.logger.debug( - f"\tI will sleep for {self.settings.watch_interval_sec} sec." - f" to see if it appears." + f"\tI will sleep for {self.settings.watch_interval_sec} s to see if it appears." ) sleep(self.settings.watch_interval_sec) diff --git a/src/flowcept/flowceptor/adapters/zambeze/zambeze_interceptor.py b/src/flowcept/flowceptor/adapters/zambeze/zambeze_interceptor.py index 05efc3ca..1062e390 100644 --- a/src/flowcept/flowceptor/adapters/zambeze/zambeze_interceptor.py +++ b/src/flowcept/flowceptor/adapters/zambeze/zambeze_interceptor.py @@ -54,7 +54,7 @@ def stop(self) -> bool: self._channel.stop_consuming() except Exception as e: self.logger.warning( - f"This exception is expected to occur after " f"channel.basic_cancel: {e}" + f"This exception is expected to occur after channel.basic_cancel: {e}" ) sleep(2) self._observer_thread.join() @@ -88,9 +88,7 @@ def observe(self): ) def _intercept(self, body_obj): - self.logger.debug( - f"Zambeze interceptor needs to intercept this:" f"\n\t{json.dumps(body_obj)}" - ) + self.logger.debug(f"Zambeze interceptor needs to intercept this:\n\t{json.dumps(body_obj)}") task_msg = self.prepare_task_msg(body_obj) self.intercept(task_msg.to_dict()) diff --git a/src/flowcept/flowceptor/consumers/consumer_utils.py b/src/flowcept/flowceptor/consumers/consumer_utils.py index aa46cbf2..491224c8 100644 --- a/src/flowcept/flowceptor/consumers/consumer_utils.py +++ b/src/flowcept/flowceptor/consumers/consumer_utils.py @@ -7,6 +7,7 @@ import pytz from flowcept.commons.flowcept_dataclasses.task_object import TaskObject +from flowcept.commons.vocabulary import Status def curate_task_msg(task_msg_dict: dict, convert_times=True): @@ -101,6 +102,8 @@ def curate_dict_task_messages( if "status" in doc: doc[doc["status"].lower()] = True # doc.pop("status") + if "finished" in doc and doc["finished"]: + doc["status"] = Status.FINISHED.value if utc_time_at_insertion > 0: doc["utc_time_at_insertion"] = utc_time_at_insertion diff --git a/src/flowcept/flowceptor/consumers/document_inserter.py b/src/flowcept/flowceptor/consumers/document_inserter.py index 325135ad..15118c66 100644 --- a/src/flowcept/flowceptor/consumers/document_inserter.py +++ b/src/flowcept/flowceptor/consumers/document_inserter.py @@ -1,6 +1,6 @@ """Document Inserter module.""" -from threading import Thread, Lock +from threading import Thread from time import time, sleep from typing import Dict from uuid import uuid4 @@ -54,7 +54,6 @@ def __init__( mq_port=None, bundle_exec_id=None, ): - self._task_dicts_buffer = list() self._mq_dao = MQDao.build(mq_host, mq_port) self._doc_daos = [] if MONGO_ENABLED: @@ -69,13 +68,11 @@ def __init__( self.logger = FlowceptLogger() self._main_thread: Thread = None self._curr_max_buffer_size = MAX_BUFFER_SIZE - self._lock = Lock() self._bundle_exec_id = bundle_exec_id self.check_safe_stops = check_safe_stops self.buffer: AutoflushBuffer = AutoflushBuffer( max_size=self._curr_max_buffer_size, flush_interval=INSERTION_BUFFER_TIME, - logger=self.logger, flush_function=DocumentInserter.flush_function, flush_function_kwargs={"logger": self.logger, "doc_daos": self._doc_daos}, ) @@ -84,38 +81,25 @@ def _set_buffer_size(self): if not ADAPTIVE_BUFFER_SIZE: return else: - # Adaptive buffer size to increase/decrease depending on the flow - # of messages (#messages/unit of time) - if len(self._task_dicts_buffer) >= MAX_BUFFER_SIZE: - self._curr_max_buffer_size = MAX_BUFFER_SIZE - elif len(self._task_dicts_buffer) < self._curr_max_buffer_size: - # decrease buffer size by 10%, lower-bounded by 10 - self._curr_max_buffer_size = max( - MIN_BUFFER_SIZE, - int(self._curr_max_buffer_size * 0.9), - ) - else: - # increase buffer size by 10%, - # upper-bounded by MONGO_INSERTION_BUFFER_SIZE - self._curr_max_buffer_size = max( - MIN_BUFFER_SIZE, - min( - MAX_BUFFER_SIZE, - int(self._curr_max_buffer_size * 1.1), - ), - ) + self._curr_max_buffer_size = max( + MIN_BUFFER_SIZE, + min( + MAX_BUFFER_SIZE, + int(self._curr_max_buffer_size * 1.1), + ), + ) @staticmethod def flush_function(buffer, doc_daos, logger): """Flush it.""" logger.info( - f"Current Doc buffer size: {len(buffer)}, " f"Gonna flush {len(buffer)} msgs to DocDBs!" + f"Current Doc buffer size: {len(buffer)}, Gonna flush {len(buffer)} msgs to DocDBs!" ) for dao in doc_daos: dao.insert_and_update_many_tasks(buffer, TaskObject.task_id_field()) logger.debug( - f"DocDao={id(dao)},DocDaoClass={dao.__class__.__name__};" - f" Flushed {len(buffer)} msgs to this DocDB!" + f"DocDao={id(dao)},DocDaoClass={dao.__class__.__name__};\ + Flushed {len(buffer)} msgs to this DocDB!" ) # TODO: add name def _handle_task_message(self, message: Dict): @@ -145,14 +129,14 @@ def _handle_task_message(self, message: Dict): remove_empty_fields_from_dict(message) self.logger.debug( - f"Received following Task msg in DocInserter:" f"\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" + f"Received following Task msg in DocInserter:\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" ) self.buffer.append(message) def _handle_workflow_message(self, message: Dict): message.pop("type") self.logger.debug( - f"Received following Workflow msg in DocInserter: \n\t[BEGIN_MSG]{message}\n[END_MSG]\t" + f"Received following Workflow msg in DocInserter:\n\t[BEGIN_MSG]{message}\n[END_MSG]\t" ) if REMOVE_EMPTY_FIELDS: remove_empty_fields_from_dict(message) @@ -186,23 +170,19 @@ def _handle_control_message(self, message): self.logger.info("Document Inserter is stopping...") return "stop" - def start(self) -> "DocumentInserter": + def start(self, threaded=True) -> "DocumentInserter": """Start it.""" self._mq_dao.subscribe() - self._main_thread = Thread(target=self._start) - self._main_thread.start() + if threaded: + self._main_thread = Thread(target=self._start) + self._main_thread.start() + else: + self._start() return self def _start(self): - while True: - try: - self._mq_dao.message_listener(self._message_handler) - self.buffer.stop() - break - except Exception as e: - self.logger.exception(e) - sleep(2) - self.logger.debug("Still in the doc insert. message listen loop") + self._mq_dao.message_listener(self._message_handler) + self.buffer.stop() self.logger.info("Ok, we broke the doc inserter message listen loop!") def _message_handler(self, msg_obj: dict): @@ -247,10 +227,10 @@ def stop(self, bundle_exec_id=None): ) sleep(sleep_time) if trial >= max_trials: - if len(self._task_dicts_buffer) == 0: # and len(self._mq_dao._buffer) == 0: - msg = f"DocInserter {id(self)} gave up waiting for signal. " - self.logger.critical(msg + "Safe to stop now.") - break + # if len(self._mq_dao._buffer) == 0: + msg = f"DocInserter {id(self)} gave up waiting for signal. " + self.logger.critical(msg + "Safe to stop now.") + break self.logger.info("Sending message to stop document inserter.") self._mq_dao.send_document_inserter_stop() self.logger.info(f"Doc Inserter {id(self)} Sent message to stop itself.") diff --git a/src/flowcept/instrumentation/flowcept_loop.py b/src/flowcept/instrumentation/flowcept_loop.py index 1a0b4b7c..fc7cb8e7 100644 --- a/src/flowcept/instrumentation/flowcept_loop.py +++ b/src/flowcept/instrumentation/flowcept_loop.py @@ -57,12 +57,14 @@ def __init__( items_length=0, capture_enabled=True, ): + self._current_iteration_task = {} if not (INSTRUMENTATION_ENABLED and capture_enabled): # These do_nothing functions help reduce overhead if no instrumentation is needed # because we do this if not enabled only here and never again. self._next_func = self._do_nothing_next self.end_iter = self._do_nothing_in_end_iter self._iterator = iter(items) + self.enabled = False return if hasattr(items, "__len__"): @@ -89,8 +91,9 @@ def __init__( else: raise Exception("Not supported iterator items type.") - self._current_iteration_task = {} - self._group_id = str(id(self)) + group_id = str(id(self) + id(self._iterator) + id(parent_task_id)) + self._group_id = group_id # str(id(self)) + self.enabled = True self.end_iter = self._end_iter self._next_func = self._our_next self._next_counter = 0 @@ -243,14 +246,16 @@ def __init__( # because we do this if not enabled only here and never again. self._next_func = self._do_nothing_next self.end_iter = self._do_nothing_in_end_iter + self.enabled = False return + self.enabled = True self._next_func = self._our_next self._next_counter = -1 self._current_item = None self._loop_name = loop_name self._item_name = item_name - self._group_id = str(id(self)) + self._group_id = str(id(self) + id(self._iterator) + id(parent_task_id)) self._act_id = loop_name + "_iteration" self.workflow_id = workflow_id or Flowcept.current_workflow_id or str(uuid.uuid4()) task_obj = { diff --git a/src/flowcept/instrumentation/flowcept_torch.py b/src/flowcept/instrumentation/flowcept_torch.py index d63147bb..1ab36e90 100644 --- a/src/flowcept/instrumentation/flowcept_torch.py +++ b/src/flowcept/instrumentation/flowcept_torch.py @@ -27,6 +27,8 @@ from flowcept.flowceptor.adapters.instrumentation_interceptor import InstrumentationInterceptor from flowcept.instrumentation.flowcept_task import get_current_context_task_id +TORCH_CONFIG = INSTRUMENTATION.get("torch") + def flowcept_torch(cls): """ @@ -113,20 +115,19 @@ def __init__(self, *args, **kwargs): capture_enabled = kwargs.get("capture_enabled", True) if not instrumentation_enabled or not capture_enabled: return - _what = INSTRUMENTATION.get("torch", {}).get("what") + _what = TORCH_CONFIG.get("what") self._parent_enabled = _what is not None and "parent" in _what self._children_enabled = _what is not None and "children" in _what if self._parent_enabled: self.forward = self._our_forward_parent - self._epochs_at_every = INSTRUMENTATION.get("torch", {}).get( - "capture_epochs_at_every", 1 - ) + self._epochs_at_every = TORCH_CONFIG.get("capture_epochs_at_every", 1) self._children_mode = None self._should_update_children_forward = False + self._children_tensor_inspection_enabled = False if self._children_enabled: - self._children_mode = INSTRUMENTATION.get("torch", {}).get("children_mode", None) + self._children_mode = TORCH_CONFIG.get("children_mode", None) self._children_tensor_inspection_enabled = "inspection" in self._children_mode if self._children_mode is None: raise Exception("You enabled children mode, but did not specify which mode.") @@ -200,13 +201,21 @@ def _our_forward_parent(self, *args, **kwargs): return y def _enable_children_forward(self): - if self._epochs_at_every > 1 and self._should_update_children_forward: - self._update_children_with_our_forward() + if ( + "children" in TORCH_CONFIG.get("what", "parent_only") + and "telemetry" in TORCH_CONFIG["children_mode"] + ): + if self._epochs_at_every > 1 and self._should_update_children_forward: + self._update_children_with_our_forward() def _disable_children_forward(self): - if self._epochs_at_every > 1 and self._should_update_children_forward: - self._update_children_with_original_forward() - self._should_update_children_forward = True + if ( + "children" in TORCH_CONFIG.get("what", "parent_only") + and "telemetry" in TORCH_CONFIG["children_mode"] + ): + if self._epochs_at_every > 1 and self._should_update_children_forward: + self._update_children_with_original_forward() + self._should_update_children_forward = True def _get_profile(self): nparams = 0 @@ -427,7 +436,7 @@ def _our_forward_tensor_inspection(self, *args, **kwargs): def _get_parent_loop_class(epoch_or_batch): - loop_mode = INSTRUMENTATION.get("torch", {}).get(f"{epoch_or_batch}_loop", "default") + loop_mode = TORCH_CONFIG.get(f"{epoch_or_batch}_loop", "default") if loop_mode == "lightweight": from flowcept.instrumentation.flowcept_loop import FlowceptLightweightLoop @@ -454,6 +463,9 @@ def __init__( parent_task_id=None, workflow_id=None, ): + if TORCH_CONFIG.get("epoch_loop", None) is None: + super().__init__(items=items, capture_enabled=False) + return super().__init__( items, loop_name=FlowceptEpochLoop.ACTIVITY_ID, @@ -520,17 +532,19 @@ def __init__( items_length=0, ): self._epochs_loop = epochs_loop - if self._epochs_loop is None: + if ( + (self._epochs_loop is None) + or (not self._epochs_loop.enabled) + or (TORCH_CONFIG.get("batch_loop", None) is None) + ): super().__init__(items=items, items_length=items_length, capture_enabled=False) return self.activity_id = f"{step}_batch" - if parent_task_id is None: - print() super().__init__( items, loop_name=self.activity_id, item_name="batch", - parent_task_id=parent_task_id or epochs_loop.get_current_iteration_id(), + parent_task_id=parent_task_id or self._epochs_loop.get_current_iteration_id(), workflow_id=workflow_id or epochs_loop.workflow_id, items_length=items_length, ) diff --git a/src/flowcept/main.py b/src/flowcept/main.py deleted file mode 100644 index 82c847ec..00000000 --- a/src/flowcept/main.py +++ /dev/null @@ -1,45 +0,0 @@ -"""Main driver module.""" - -import sys - -from flowcept import ( - Flowcept, - ZambezeInterceptor, - MLFlowInterceptor, - TensorboardInterceptor, -) -from flowcept.commons.vocabulary import Vocabulary -from flowcept.configs import settings - - -INTERCEPTORS = { - Vocabulary.Settings.ZAMBEZE_KIND: ZambezeInterceptor, - Vocabulary.Settings.MLFLOW_KIND: MLFlowInterceptor, - Vocabulary.Settings.TENSORBOARD_KIND: TensorboardInterceptor, - # Vocabulary.Settings.DASK_KIND: DaskInterceptor, -} - - -def main(): - """Run the main driver.""" - interceptors = [] - for plugin_key in settings["plugins"]: - plugin_settings_obj = settings["plugins"][plugin_key] - if "enabled" in plugin_settings_obj and not plugin_settings_obj["enabled"]: - continue - - kind = plugin_settings_obj["kind"] - - if kind in INTERCEPTORS: - interceptor = INTERCEPTORS[plugin_settings_obj["kind"]](plugin_key) - interceptors.append(interceptor) - - consumer = Flowcept(interceptors) - consumer.start() - - -if __name__ == "__main__": - try: - main() - except KeyboardInterrupt: - sys.exit(0) diff --git a/tests/api/db_api_test.py b/tests/api/db_api_test.py index 53882fb6..1b77bdf1 100644 --- a/tests/api/db_api_test.py +++ b/tests/api/db_api_test.py @@ -64,7 +64,7 @@ def test_save_blob(self): obj = pickle.dumps(OurObject()) - obj_id = Flowcept.db.save_object(object=obj, save_data_in_collection=True) + obj_id = Flowcept.db.save_or_update_object(object=obj, save_data_in_collection=True) print(obj_id) obj_docs = Flowcept.db.query(filter={"object_id": obj_id}, collection="objects") diff --git a/tests/instrumentation_tests/ml_tests/dl_trainer.py b/tests/instrumentation_tests/ml_tests/dl_trainer.py index a2aace15..a6a92a92 100644 --- a/tests/instrumentation_tests/ml_tests/dl_trainer.py +++ b/tests/instrumentation_tests/ml_tests/dl_trainer.py @@ -185,7 +185,7 @@ def model_fit( test_data, _ = batch result = test_info.copy() - best_obj_id = Flowcept.db.save_torch_model(model, task_id=task_id, workflow_id=workflow_id, custom_metadata=result) + best_obj_id = Flowcept.db.save_or_update_torch_model(model, task_id=task_id, workflow_id=workflow_id, custom_metadata=result) result.update( { "best_obj_id": best_obj_id, diff --git a/tests/instrumentation_tests/ml_tests/ml_decorator_test.py b/tests/instrumentation_tests/ml_tests/ml_decorator_test.py index 69ec0803..2aca2cde 100644 --- a/tests/instrumentation_tests/ml_tests/ml_decorator_test.py +++ b/tests/instrumentation_tests/ml_tests/ml_decorator_test.py @@ -11,7 +11,7 @@ class MLDecoratorTests(unittest.TestCase): @unittest.skipIf(not MONGO_ENABLED, "MongoDB is disabled") def test_torch_save_n_load(self): model = nn.Module() - model_id = Flowcept.db.save_torch_model(model) + model_id = Flowcept.db.save_or_update_torch_model(model) new_model = nn.Module() doc = Flowcept.db.load_torch_model(model=new_model, object_id=model_id) print(doc)