diff --git a/.github/workflows/create-release-n-publish.yml b/.github/workflows/create-release-n-publish.yml index 687994aa..e6033646 100644 --- a/.github/workflows/create-release-n-publish.yml +++ b/.github/workflows/create-release-n-publish.yml @@ -7,6 +7,8 @@ jobs: build: name: Create Release and Publish runs-on: ubuntu-latest + env: + FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml' steps: - name: Checkout code uses: actions/checkout@v2 diff --git a/.github/workflows/run-tests.yml b/.github/workflows/run-tests.yml index 04be656a..5c3222f8 100644 --- a/.github/workflows/run-tests.yml +++ b/.github/workflows/run-tests.yml @@ -7,6 +7,8 @@ jobs: build: runs-on: ubuntu-latest if: "!contains(github.event.head_commit.message, 'CI Bot')" + env: + FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml' steps: - uses: actions/checkout@v3 - name: Set up Python 3.8 diff --git a/.github/workflows/test-python-310-macos.yml b/.github/workflows/test-python-310-macos.yml index 80251c60..600d1688 100644 --- a/.github/workflows/test-python-310-macos.yml +++ b/.github/workflows/test-python-310-macos.yml @@ -6,6 +6,8 @@ jobs: build: runs-on: macos-latest if: "!contains(github.event.head_commit.message, 'CI Bot')" + env: + FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml' steps: - uses: actions/checkout@v3 - name: Set up Python 3.10 diff --git a/.github/workflows/test-python-310.yml b/.github/workflows/test-python-310.yml index b9cb2ea1..0f0bb937 100644 --- a/.github/workflows/test-python-310.yml +++ b/.github/workflows/test-python-310.yml @@ -9,6 +9,8 @@ jobs: build: runs-on: ubuntu-latest if: "!contains(github.event.head_commit.message, 'CI Bot')" + env: + FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml' steps: - uses: actions/checkout@v3 - name: Set up Python 3.10 diff --git a/.github/workflows/test-python-311.yml b/.github/workflows/test-python-311.yml index f7c74df5..b3c7b3f4 100644 --- a/.github/workflows/test-python-311.yml +++ b/.github/workflows/test-python-311.yml @@ -11,6 +11,8 @@ jobs: build: runs-on: ubuntu-latest if: "!contains(github.event.head_commit.message, 'CI Bot')" + env: + FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml' steps: - uses: actions/checkout@v3 - name: Set up Python 3.11 diff --git a/.github/workflows/test-python-39.yml b/.github/workflows/test-python-39.yml index e8ea34d5..d8106d13 100644 --- a/.github/workflows/test-python-39.yml +++ b/.github/workflows/test-python-39.yml @@ -9,6 +9,8 @@ jobs: build: runs-on: ubuntu-latest if: "!contains(github.event.head_commit.message, 'CI Bot')" + env: + FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml' steps: - uses: actions/checkout@v3 - name: Set up Python 3.9 diff --git a/README.md b/README.md index 1cea9ef8..093ff436 100644 --- a/README.md +++ b/README.md @@ -74,6 +74,14 @@ And other variables depending on the Plugin. For instance, in Dask, timestamp cr If you used FlowCept for your research, consider citing our paper. +``` +Towards Lightweight Data Integration using Multi-workflow Provenance and Data Observability +R. Souza, T. Skluzacek, S. Wilkinson, M. Ziatdinov, and R. da Silva +19th IEEE International Conference on e-Science, 2023. +``` + +**Bibtex:** + ```latex @inproceedings{souza2023towards, author = {Souza, Renan and Skluzacek, Tyler J and Wilkinson, Sean R and Ziatdinov, Maxim and da Silva, Rafael Ferreira}, diff --git a/extra_requirements/dev-requirements.txt b/extra_requirements/dev-requirements.txt index af23708b..d387464e 100644 --- a/extra_requirements/dev-requirements.txt +++ b/extra_requirements/dev-requirements.txt @@ -5,3 +5,6 @@ numpy==1.23.4 bokeh==2.4.2 jupyterlab==3.6.1 nbmake==1.4 +torch==2.0.1 +torchvision==0.15.2 + diff --git a/extra_requirements/mongo-requirements.txt b/extra_requirements/mongo-requirements.txt deleted file mode 100644 index 39843a87..00000000 --- a/extra_requirements/mongo-requirements.txt +++ /dev/null @@ -1 +0,0 @@ -pymongo==4.3.3 diff --git a/extra_requirements/webserver-requirements.txt b/extra_requirements/webserver-requirements.txt deleted file mode 100644 index 2676f049..00000000 --- a/extra_requirements/webserver-requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -Werkzeug==2.2.2 -flask==2.2.2 -requests==2.28.2 -flask_restful==0.3.9 diff --git a/flowcept/commons/daos/mq_dao.py b/flowcept/commons/daos/mq_dao.py index e6cb6fc4..2a933d5f 100644 --- a/flowcept/commons/daos/mq_dao.py +++ b/flowcept/commons/daos/mq_dao.py @@ -78,9 +78,16 @@ def _flush(self): if len(self._buffer): pipe = self._redis.pipeline() for message in self._buffer: - pipe.publish( - REDIS_CHANNEL, json.dumps(message, cls=MQDao.ENCODER) - ) + try: + pipe.publish( + REDIS_CHANNEL, + json.dumps(message, cls=MQDao.ENCODER), + ) + except Exception as e: + self.logger.error( + "Critical error as some messages couldn't be flushed! Check the messages' contents!" + ) + self.logger.exception(e) t0 = 0 if PERF_LOG: t0 = time() diff --git a/flowcept/commons/utils.py b/flowcept/commons/utils.py index 60f82200..4148a6e4 100644 --- a/flowcept/commons/utils.py +++ b/flowcept/commons/utils.py @@ -1,6 +1,9 @@ from datetime import datetime, timedelta import json from time import time + +import numpy as np + from flowcept.configs import PERF_LOG from flowcept.commons.flowcept_logger import FlowceptLogger from flowcept.commons.flowcept_dataclasses.task_message import Status @@ -56,6 +59,18 @@ def default(self, obj): return str(obj) except: return None + elif ( + isinstance(obj, np.int) + or isinstance(obj, np.int32) + or isinstance(obj, np.int64) + ): + return int(obj) + elif ( + isinstance(obj, np.float) + or isinstance(obj, np.float32) + or isinstance(obj, np.float64) + ): + return float(obj) return super().default(obj) diff --git a/flowcept/configs.py b/flowcept/configs.py index beca3340..0e51c2cc 100644 --- a/flowcept/configs.py +++ b/flowcept/configs.py @@ -11,16 +11,21 @@ PROJECT_NAME = os.getenv("PROJECT_NAME", "flowcept") SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", None) if SETTINGS_PATH is None: - project_dir_path = os.path.abspath( - os.path.join(os.path.dirname(__file__), "..") - ) - SETTINGS_PATH = os.path.join( - project_dir_path, "resources", "settings.yaml" + raise Exception( + "Please define an environment variable with the ABSOLUTE path to " + "the settings.yaml file. There is a sample file " + "in the resources directory under the project's root path." ) + # project_dir_path = os.path.abspath( + # os.path.join(os.path.dirname(__file__), "..") + # ) + # SETTINGS_PATH = os.path.join( + # project_dir_path, "resources", "settings.yaml" + # ) -if not os.path.isabs(SETTINGS_PATH): - # TODO: check if we really need abs path - raise Exception("Please use an absolute path for the settings.yaml") +# if not os.path.isabs(SETTINGS_PATH): +# # TODO: check if we really need abs path +# raise Exception("Please use an absolute path for the settings.yaml") with open(SETTINGS_PATH) as f: @@ -99,7 +104,9 @@ DEBUG_MODE = settings["project"].get("debug", False) PERF_LOG = settings["project"].get("performance_logging", False) JSON_SERIALIZER = settings["project"].get("json_serializer", "default") + TELEMETRY_CAPTURE = settings["project"].get("telemetry_capture", None) +RAI_CAPTURE = settings["project"].get("responsible_ai_capture", None) ###################### # SYS METADATA # diff --git a/flowcept/flowceptor/plugins/settings_factory.py b/flowcept/flowceptor/plugins/settings_factory.py index c8263dd5..bd54fa96 100644 --- a/flowcept/flowceptor/plugins/settings_factory.py +++ b/flowcept/flowceptor/plugins/settings_factory.py @@ -60,7 +60,8 @@ def get_settings(plugin_key: str) -> BaseSettings: # Add any specific setting builder below if kind == Vocabulary.Settings.ZAMBEZE_KIND: - settings_obj.key_values_to_filter = [ - KeyValue(**item) for item in settings_obj.key_values_to_filter - ] + if settings_obj.key_values_to_filter is not None: + settings_obj.key_values_to_filter = [ + KeyValue(**item) for item in settings_obj.key_values_to_filter + ] return settings_obj diff --git a/flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py b/flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py index 3aedeca1..44c99f5c 100644 --- a/flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py +++ b/flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py @@ -26,8 +26,7 @@ class ZambezeSettings(BaseSettings): host: str port: int queue_name: str - key_values_to_filter: List[KeyValue] - keys_to_intercept: List[str] + key_values_to_filter: List[KeyValue] = None kind = "zambeze" def __post_init__(self): diff --git a/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py b/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py index 56fa3e4e..49dc1d06 100644 --- a/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py +++ b/flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py @@ -21,18 +21,20 @@ def __init__(self, plugin_key="zambeze"): def prepare_task_msg(self, zambeze_msg: Dict) -> TaskMessage: task_msg = TaskMessage() task_msg.utc_timestamp = get_utc_now() - task_msg.campaign_id = zambeze_msg.get("campaign_id") - task_msg.task_id = zambeze_msg.get("activity_id") - task_msg.activity_id = zambeze_msg.get("name") - task_msg.dependencies = zambeze_msg.get("depends_on") - task_msg.custom_metadata = {"command": zambeze_msg.get("command")} + task_msg.campaign_id = zambeze_msg.get("campaign_id", None) + task_msg.task_id = zambeze_msg.get("activity_id", None) + task_msg.activity_id = zambeze_msg.get("name", None) + task_msg.dependencies = zambeze_msg.get("depends_on", None) + task_msg.custom_metadata = { + "command": zambeze_msg.get("command", None) + } task_msg.status = get_status_from_str( - zambeze_msg.get("activity_status") + zambeze_msg.get("activity_status", None) ) task_msg.used = { - "args": zambeze_msg["arguments"], - "kwargs": zambeze_msg["kwargs"], - "files": zambeze_msg["files"], + "args": zambeze_msg.get("arguments", None), + "kwargs": zambeze_msg.get("kwargs", None), + "files": zambeze_msg.get("files", None), } return task_msg @@ -74,21 +76,26 @@ def observe(self): try: self._channel.start_consuming() except Exception as e: - self.logger.warn( - f"This exception is expected to occur after " - f"channel.start_consuming finishes: {e}" + self.logger.warning( + f"If this exception happens after " + f"channel.start_consuming finishes, it is expected:\n {e}" ) + def _intercept(self, body_obj): + self.logger.debug( + f"I'm a Zambeze interceptor and I need to intercept this:" + f"\n\t{json.dumps(body_obj)}" + ) + task_msg = self.prepare_task_msg(body_obj) + self.intercept(task_msg) + def callback(self, ch, method, properties, body): body_obj = json.loads(body) - - for key_value in self.settings.key_values_to_filter: - if key_value.key in body_obj: - if body_obj[key_value.key] == key_value.value: - self.logger.debug( - f"I'm an interceptor and I need to intercept this:" - f"\n\t{json.dumps(body_obj)}" - ) - task_msg = self.prepare_task_msg(body_obj) - self.intercept(task_msg) - break + if self.settings.key_values_to_filter is not None: + for key_value in self.settings.key_values_to_filter: + if key_value.key in body_obj: + if body_obj[key_value.key] == key_value.value: + self._intercept(body_obj) + break + else: + self._intercept(body_obj) diff --git a/flowcept/flowceptor/telemetry_capture.py b/flowcept/flowceptor/telemetry_capture.py index b0d7b221..23b912da 100644 --- a/flowcept/flowceptor/telemetry_capture.py +++ b/flowcept/flowceptor/telemetry_capture.py @@ -94,7 +94,7 @@ def _capture_process_info(self): p.cpu_number = psutil_p.cpu_num() except: pass - p.memory = psutil_p.memory_info() + p.memory = psutil_p.memory_info()._asdict() p.memory_percent = psutil_p.memory_percent() p.cpu_times = psutil_p.cpu_times()._asdict() p.cpu_percent = psutil_p.cpu_percent() diff --git a/requirements.txt b/requirements.txt index f778fccb..382e88b3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,9 @@ PyYAML==6.0.1 redis==4.4.2 psutil==5.9.5 +pymongo==4.3.3 nvidia-ml-py==11.525.131 +Werkzeug==2.2.2 +flask==2.2.2 +requests==2.28.2 +flask_restful==0.3.9 diff --git a/resources/settings.yaml b/resources/settings.yaml index 396e70cf..b0f56577 100644 --- a/resources/settings.yaml +++ b/resources/settings.yaml @@ -10,6 +10,7 @@ project: mem: true disk: true network: true + log: log_path: flowcept.log @@ -62,15 +63,9 @@ plugins: host: localhost port: 5672 queue_name: hello - key_values_to_filter: - - key: activity_status - value: CREATED - keys_to_intercept: - - arguments - - kwargs - - files - - name - - activity_id +# key_values_to_filter: +# - key: activity_status +# value: CREATED enrich_messages: true mlflow: diff --git a/setup.py b/setup.py index 2a9d9632..7b9f9032 100644 --- a/setup.py +++ b/setup.py @@ -1,4 +1,3 @@ -from sys import platform import os import re from setuptools import setup, find_packages @@ -43,9 +42,7 @@ def get_requirements(file_path): "zambeze", "mlflow", "tensorboard", - "mongo", "dask", - "webserver", ] extras_require = dict() diff --git a/tests/plugins/test_zambeze.py b/tests/plugins/test_zambeze.py index 82453810..cf6f2600 100644 --- a/tests/plugins/test_zambeze.py +++ b/tests/plugins/test_zambeze.py @@ -61,7 +61,7 @@ def test_send_message(self): routing_key=self._queue_name, body=json.dumps(msg.__dict__), ) - + print("Zambeze Activity_id", act_id) self.logger.debug(" [x] Sent msg") sleep(5) self._connection.close()