Skip to content

Commit

Permalink
Merge pull request #89 from ORNL/dev
Browse files Browse the repository at this point in the history
Main < Dev
  • Loading branch information
renan-souza authored Jan 16, 2024
2 parents 133ec4d + 626b241 commit 5100a5f
Show file tree
Hide file tree
Showing 21 changed files with 109 additions and 58 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/create-release-n-publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ jobs:
build:
name: Create Release and Publish
runs-on: ubuntu-latest
env:
FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml'
steps:
- name: Checkout code
uses: actions/checkout@v2
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/run-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ jobs:
build:
runs-on: ubuntu-latest
if: "!contains(github.event.head_commit.message, 'CI Bot')"
env:
FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml'
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.8
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python-310-macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ jobs:
build:
runs-on: macos-latest
if: "!contains(github.event.head_commit.message, 'CI Bot')"
env:
FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml'
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python-310.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ jobs:
build:
runs-on: ubuntu-latest
if: "!contains(github.event.head_commit.message, 'CI Bot')"
env:
FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml'
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.10
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python-311.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ jobs:
build:
runs-on: ubuntu-latest
if: "!contains(github.event.head_commit.message, 'CI Bot')"
env:
FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml'
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.11
Expand Down
2 changes: 2 additions & 0 deletions .github/workflows/test-python-39.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ jobs:
build:
runs-on: ubuntu-latest
if: "!contains(github.event.head_commit.message, 'CI Bot')"
env:
FLOWCEPT_SETTINGS_PATH: 'resources/settings.yaml'
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.9
Expand Down
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,14 @@ And other variables depending on the Plugin. For instance, in Dask, timestamp cr
If you used FlowCept for your research, consider citing our paper.
```
Towards Lightweight Data Integration using Multi-workflow Provenance and Data Observability
R. Souza, T. Skluzacek, S. Wilkinson, M. Ziatdinov, and R. da Silva
19th IEEE International Conference on e-Science, 2023.
```

**Bibtex:**

```latex
@inproceedings{souza2023towards,
author = {Souza, Renan and Skluzacek, Tyler J and Wilkinson, Sean R and Ziatdinov, Maxim and da Silva, Rafael Ferreira},
Expand Down
3 changes: 3 additions & 0 deletions extra_requirements/dev-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,6 @@ numpy==1.23.4
bokeh==2.4.2
jupyterlab==3.6.1
nbmake==1.4
torch==2.0.1
torchvision==0.15.2

1 change: 0 additions & 1 deletion extra_requirements/mongo-requirements.txt

This file was deleted.

4 changes: 0 additions & 4 deletions extra_requirements/webserver-requirements.txt

This file was deleted.

13 changes: 10 additions & 3 deletions flowcept/commons/daos/mq_dao.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,16 @@ def _flush(self):
if len(self._buffer):
pipe = self._redis.pipeline()
for message in self._buffer:
pipe.publish(
REDIS_CHANNEL, json.dumps(message, cls=MQDao.ENCODER)
)
try:
pipe.publish(
REDIS_CHANNEL,
json.dumps(message, cls=MQDao.ENCODER),
)
except Exception as e:
self.logger.error(
"Critical error as some messages couldn't be flushed! Check the messages' contents!"
)
self.logger.exception(e)
t0 = 0
if PERF_LOG:
t0 = time()
Expand Down
15 changes: 15 additions & 0 deletions flowcept/commons/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from datetime import datetime, timedelta
import json
from time import time

import numpy as np

from flowcept.configs import PERF_LOG
from flowcept.commons.flowcept_logger import FlowceptLogger
from flowcept.commons.flowcept_dataclasses.task_message import Status
Expand Down Expand Up @@ -56,6 +59,18 @@ def default(self, obj):
return str(obj)
except:
return None
elif (
isinstance(obj, np.int)
or isinstance(obj, np.int32)
or isinstance(obj, np.int64)
):
return int(obj)
elif (
isinstance(obj, np.float)
or isinstance(obj, np.float32)
or isinstance(obj, np.float64)
):
return float(obj)
return super().default(obj)


Expand Down
23 changes: 15 additions & 8 deletions flowcept/configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,21 @@
PROJECT_NAME = os.getenv("PROJECT_NAME", "flowcept")
SETTINGS_PATH = os.getenv("FLOWCEPT_SETTINGS_PATH", None)
if SETTINGS_PATH is None:
project_dir_path = os.path.abspath(
os.path.join(os.path.dirname(__file__), "..")
)
SETTINGS_PATH = os.path.join(
project_dir_path, "resources", "settings.yaml"
raise Exception(
"Please define an environment variable with the ABSOLUTE path to "
"the settings.yaml file. There is a sample file "
"in the resources directory under the project's root path."
)
# project_dir_path = os.path.abspath(
# os.path.join(os.path.dirname(__file__), "..")
# )
# SETTINGS_PATH = os.path.join(
# project_dir_path, "resources", "settings.yaml"
# )

if not os.path.isabs(SETTINGS_PATH):
# TODO: check if we really need abs path
raise Exception("Please use an absolute path for the settings.yaml")
# if not os.path.isabs(SETTINGS_PATH):
# # TODO: check if we really need abs path
# raise Exception("Please use an absolute path for the settings.yaml")


with open(SETTINGS_PATH) as f:
Expand Down Expand Up @@ -99,7 +104,9 @@
DEBUG_MODE = settings["project"].get("debug", False)
PERF_LOG = settings["project"].get("performance_logging", False)
JSON_SERIALIZER = settings["project"].get("json_serializer", "default")

TELEMETRY_CAPTURE = settings["project"].get("telemetry_capture", None)
RAI_CAPTURE = settings["project"].get("responsible_ai_capture", None)

######################
# SYS METADATA #
Expand Down
7 changes: 4 additions & 3 deletions flowcept/flowceptor/plugins/settings_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def get_settings(plugin_key: str) -> BaseSettings:

# Add any specific setting builder below
if kind == Vocabulary.Settings.ZAMBEZE_KIND:
settings_obj.key_values_to_filter = [
KeyValue(**item) for item in settings_obj.key_values_to_filter
]
if settings_obj.key_values_to_filter is not None:
settings_obj.key_values_to_filter = [
KeyValue(**item) for item in settings_obj.key_values_to_filter
]
return settings_obj
3 changes: 1 addition & 2 deletions flowcept/flowceptor/plugins/zambeze/zambeze_dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@ class ZambezeSettings(BaseSettings):
host: str
port: int
queue_name: str
key_values_to_filter: List[KeyValue]
keys_to_intercept: List[str]
key_values_to_filter: List[KeyValue] = None
kind = "zambeze"

def __post_init__(self):
Expand Down
53 changes: 30 additions & 23 deletions flowcept/flowceptor/plugins/zambeze/zambeze_interceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ def __init__(self, plugin_key="zambeze"):
def prepare_task_msg(self, zambeze_msg: Dict) -> TaskMessage:
task_msg = TaskMessage()
task_msg.utc_timestamp = get_utc_now()
task_msg.campaign_id = zambeze_msg.get("campaign_id")
task_msg.task_id = zambeze_msg.get("activity_id")
task_msg.activity_id = zambeze_msg.get("name")
task_msg.dependencies = zambeze_msg.get("depends_on")
task_msg.custom_metadata = {"command": zambeze_msg.get("command")}
task_msg.campaign_id = zambeze_msg.get("campaign_id", None)
task_msg.task_id = zambeze_msg.get("activity_id", None)
task_msg.activity_id = zambeze_msg.get("name", None)
task_msg.dependencies = zambeze_msg.get("depends_on", None)
task_msg.custom_metadata = {
"command": zambeze_msg.get("command", None)
}
task_msg.status = get_status_from_str(
zambeze_msg.get("activity_status")
zambeze_msg.get("activity_status", None)
)
task_msg.used = {
"args": zambeze_msg["arguments"],
"kwargs": zambeze_msg["kwargs"],
"files": zambeze_msg["files"],
"args": zambeze_msg.get("arguments", None),
"kwargs": zambeze_msg.get("kwargs", None),
"files": zambeze_msg.get("files", None),
}
return task_msg

Expand Down Expand Up @@ -74,21 +76,26 @@ def observe(self):
try:
self._channel.start_consuming()
except Exception as e:
self.logger.warn(
f"This exception is expected to occur after "
f"channel.start_consuming finishes: {e}"
self.logger.warning(
f"If this exception happens after "
f"channel.start_consuming finishes, it is expected:\n {e}"
)

def _intercept(self, body_obj):
self.logger.debug(
f"I'm a Zambeze interceptor and I need to intercept this:"
f"\n\t{json.dumps(body_obj)}"
)
task_msg = self.prepare_task_msg(body_obj)
self.intercept(task_msg)

def callback(self, ch, method, properties, body):
body_obj = json.loads(body)

for key_value in self.settings.key_values_to_filter:
if key_value.key in body_obj:
if body_obj[key_value.key] == key_value.value:
self.logger.debug(
f"I'm an interceptor and I need to intercept this:"
f"\n\t{json.dumps(body_obj)}"
)
task_msg = self.prepare_task_msg(body_obj)
self.intercept(task_msg)
break
if self.settings.key_values_to_filter is not None:
for key_value in self.settings.key_values_to_filter:
if key_value.key in body_obj:
if body_obj[key_value.key] == key_value.value:
self._intercept(body_obj)
break
else:
self._intercept(body_obj)
2 changes: 1 addition & 1 deletion flowcept/flowceptor/telemetry_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def _capture_process_info(self):
p.cpu_number = psutil_p.cpu_num()
except:
pass
p.memory = psutil_p.memory_info()
p.memory = psutil_p.memory_info()._asdict()
p.memory_percent = psutil_p.memory_percent()
p.cpu_times = psutil_p.cpu_times()._asdict()
p.cpu_percent = psutil_p.cpu_percent()
Expand Down
5 changes: 5 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
PyYAML==6.0.1
redis==4.4.2
psutil==5.9.5
pymongo==4.3.3
nvidia-ml-py==11.525.131
Werkzeug==2.2.2
flask==2.2.2
requests==2.28.2
flask_restful==0.3.9
13 changes: 4 additions & 9 deletions resources/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ project:
mem: true
disk: true
network: true


log:
log_path: flowcept.log
Expand Down Expand Up @@ -62,15 +63,9 @@ plugins:
host: localhost
port: 5672
queue_name: hello
key_values_to_filter:
- key: activity_status
value: CREATED
keys_to_intercept:
- arguments
- kwargs
- files
- name
- activity_id
# key_values_to_filter:
# - key: activity_status
# value: CREATED
enrich_messages: true

mlflow:
Expand Down
3 changes: 0 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from sys import platform
import os
import re
from setuptools import setup, find_packages
Expand Down Expand Up @@ -43,9 +42,7 @@ def get_requirements(file_path):
"zambeze",
"mlflow",
"tensorboard",
"mongo",
"dask",
"webserver",
]

extras_require = dict()
Expand Down
2 changes: 1 addition & 1 deletion tests/plugins/test_zambeze.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ def test_send_message(self):
routing_key=self._queue_name,
body=json.dumps(msg.__dict__),
)

print("Zambeze Activity_id", act_id)
self.logger.debug(" [x] Sent msg")
sleep(5)
self._connection.close()
Expand Down

0 comments on commit 5100a5f

Please sign in to comment.