diff --git a/flowcept/flowcept_consumer/doc_db/document_db_dao.py b/flowcept/flowcept_consumer/doc_db/document_db_dao.py deleted file mode 100755 index 5a2ed53e..00000000 --- a/flowcept/flowcept_consumer/doc_db/document_db_dao.py +++ /dev/null @@ -1,56 +0,0 @@ -from typing import List, Dict -from bson import ObjectId -from pymongo import MongoClient - -from flowcept.configs import ( - MONGO_HOST, - MONGO_PORT, - MONGO_DB, - MONGO_TASK_COLLECTION, -) - - -class DocumentDBDao(object): - def __init__(self): - client = MongoClient(MONGO_HOST, MONGO_PORT) - db = client[MONGO_DB] - self._collection = db[MONGO_TASK_COLLECTION] - - def find(self, filter_: Dict) -> List[Dict]: - try: - lst = list() - for doc in self._collection.find(filter_): - lst.append(doc) - return lst - except Exception as e: - print("Error when querying", e) - return None - - def insert_one(self, doc: Dict) -> ObjectId: - try: - r = self._collection.insert_one(doc) - return r.inserted_id - except Exception as e: - print("Error when inserting", doc, e) - return None - - def insert_many(self, doc_list: List[Dict]) -> List[ObjectId]: - try: - r = self._collection.insert_many(doc_list) - return r.inserted_ids - except Exception as e: - print("Error when inserting many docs", e, str(doc_list)) - return None - - def delete(self, doc_list: List[ObjectId]): - try: - self._collection.delete_many({"_id": {"$in": doc_list}}) - except Exception as e: - print("Error when deleting documents.", e) - - def count(self) -> int: - try: - return self._collection.count_documents({}) - except Exception as e: - print("Error when counting documents.", e) - return -1 diff --git a/flowcept/flowcept_consumer/doc_db/document_inserter.py b/flowcept/flowcept_consumer/doc_db/document_inserter.py deleted file mode 100755 index d871ecc5..00000000 --- a/flowcept/flowcept_consumer/doc_db/document_inserter.py +++ /dev/null @@ -1,61 +0,0 @@ -import sys -import json -from time import time, sleep -from threading import Thread -from typing import Dict -from datetime import datetime - -from flowcept.configs import ( - MONGO_INSERTION_BUFFER_TIME, - MONGO_INSERTION_BUFFER_SIZE, -) -from flowcept.commons.mq_dao import MQDao -from flowcept.flowcept_consumer.doc_db.document_db_dao import DocumentDBDao - - -class DocumentInserter: - def __init__(self): - self._buffer = list() - self._mq_dao = MQDao() - self._doc_dao = DocumentDBDao() - self._previous_time = time() - - def _flush(self): - self._doc_dao.insert_many(self._buffer) - self._buffer = list() - - def handle_message(self, intercepted_message: Dict): - dt = datetime.fromtimestamp(intercepted_message["utc_timestamp"]) - intercepted_message["timestamp"] = dt.utcnow() - self._buffer.append(intercepted_message) - print("An intercepted message was received.") - if len(self._buffer) >= MONGO_INSERTION_BUFFER_SIZE: - print("Buffer exceeded, flushing...") - self._flush() - - def time_based_flushing(self): - while True: - if len(self._buffer): - now = time() - timediff = now - self._previous_time - if timediff >= MONGO_INSERTION_BUFFER_TIME: - print("Time to flush!") - self._previous_time = now - self._flush() - sleep(MONGO_INSERTION_BUFFER_TIME) - - def main(self): - Thread(target=self.time_based_flushing).start() - pubsub = self._mq_dao.subscribe() - for message in pubsub.listen(): - if message["type"] not in {"psubscribe"}: - _dict_obj = json.loads(json.loads(message["data"])) - self.handle_message(_dict_obj) - - -if __name__ == "__main__": - try: - DocumentInserter().main() - except KeyboardInterrupt: - print("Interrupted") - sys.exit(0) diff --git a/flowcept/flowceptor/consumers/consumer_utils.py b/flowcept/flowceptor/consumers/consumer_utils.py index d57c3576..8e2cdd50 100644 --- a/flowcept/flowceptor/consumers/consumer_utils.py +++ b/flowcept/flowceptor/consumers/consumer_utils.py @@ -9,13 +9,16 @@ def curate_task_msg(task_msg_dict: dict): if field not in task_msg_dict: continue field_val = task_msg_dict[field] - if not field_val: + if type(field_val) == dict and not field_val: task_msg_dict.pop(field) # removing empty fields continue if type(field_val) == dict: original_field_val = field_val.copy() for k in original_field_val: - if not original_field_val[k]: + if ( + type(original_field_val[k]) == dict + and not original_field_val[k] + ): field_val.pop(k) # removing inner empty fields task_msg_dict[field] = field_val else: @@ -35,7 +38,7 @@ def remove_empty_fields_from_dict(obj: dict): for key, value in list(obj.items()): if isinstance(value, dict): remove_empty_fields_from_dict(value) - if not value: + if value is None: del obj[key] elif value in (None, ""): del obj[key] diff --git a/tests/doc_db_inserter/doc_db_inserter_test.py b/tests/doc_db_inserter/doc_db_inserter_test.py index cb6d5edd..5d426dda 100644 --- a/tests/doc_db_inserter/doc_db_inserter_test.py +++ b/tests/doc_db_inserter/doc_db_inserter_test.py @@ -49,7 +49,8 @@ def test_db_insert_and_update_many(self): "myid": uid, "debug": True, "name": "Renan2", - "used": {"bla": 2}, + "empty_string": "", + "used": {"bla": 2, "lala": False}, }, ] self.doc_dao.insert_and_update_many("myid", docs) diff --git a/tests/plugins/test_dask.py b/tests/plugins/test_dask.py index 211bc639..373ba9ac 100644 --- a/tests/plugins/test_dask.py +++ b/tests/plugins/test_dask.py @@ -112,6 +112,8 @@ def test_map_workflow_kwargs(self): i1 = [ {"x": np.random.random(), "y": np.random.random()}, {"x": np.random.random()}, + {"x": 4, "batch_norm": False}, + {"x": 6, "batch_norm": True, "empty_string": ""}, ] wf_id = f"wf_{uuid4()}" o1 = TestDask.client.map(dummy_func4, i1, workflow_id=wf_id)