From fdc727ba4b99947211d7038114ce02f92ae7b726 Mon Sep 17 00:00:00 2001 From: waleedalzarooni <131400134+waleedalzarooni@users.noreply.github.com> Date: Thu, 2 Jan 2025 11:54:30 +0400 Subject: [PATCH] feat: nebula graph add time label (#1383) Co-authored-by: Wendong Co-authored-by: Wendong-Fan <133094783+Wendong-Fan@users.noreply.github.com> --- camel/storages/graph_storages/nebula_graph.py | 109 ++++++++++++++---- .../graph_storages/test_nebula_graph.py | 95 ++++++++++++--- 2 files changed, 165 insertions(+), 39 deletions(-) diff --git a/camel/storages/graph_storages/nebula_graph.py b/camel/storages/graph_storages/nebula_graph.py index 659c92b7f8..14e8a48caa 100644 --- a/camel/storages/graph_storages/nebula_graph.py +++ b/camel/storages/graph_storages/nebula_graph.py @@ -15,7 +15,7 @@ import logging import re import time -from typing import TYPE_CHECKING, Any, Dict, List, Tuple +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple from camel.storages.graph_storages.base import BaseGraphStorage from camel.storages.graph_storages.graph_element import ( @@ -203,46 +203,62 @@ def add_graph_elements( def ensure_edge_type_exists( self, edge_type: str, + time_label: Optional[str] = None, ) -> None: r"""Ensures that a specified edge type exists in the NebulaGraph database. If the edge type already exists, this method does nothing. Args: edge_type (str): The name of the edge type to be created. + time_label (str, optional): A specific timestamp to set as the + default value for the time label property. If not + provided, no timestamp will be added. (default: :obj:`None`) Raises: Exception: If the edge type creation fails after multiple retry attempts, an exception is raised with the error message. """ - create_edge_stmt = f'CREATE EDGE IF NOT EXISTS {edge_type}()' + create_edge_stmt = f"CREATE EDGE IF NOT EXISTS {edge_type} ()" + if time_label is not None: + time_label = self._validate_time_label(time_label) + create_edge_stmt = f"""CREATE EDGE IF NOT EXISTS {edge_type} + (time_label DATETIME DEFAULT {time_label})""" for attempt in range(MAX_RETRIES): res = self.query(create_edge_stmt) if res.is_succeeded(): - return # Tag creation succeeded, exit the method + return # Edge type creation succeeded if attempt < MAX_RETRIES - 1: time.sleep(RETRY_DELAY) else: # Final attempt failed, raise an exception raise Exception( - f"Failed to create tag `{edge_type}` after " + f"Failed to create edge type `{edge_type}` after " f"{MAX_RETRIES} attempts: {res.error_msg()}" ) - def ensure_tag_exists(self, tag_name: str) -> None: + def ensure_tag_exists( + self, tag_name: str, time_label: Optional[str] = None + ) -> None: r"""Ensures a tag is created in the NebulaGraph database. If the tag already exists, it does nothing. Args: tag_name (str): The name of the tag to be created. + time_label (str, optional): A specific timestamp to set as the + default value for the time label property. If not provided, + no timestamp will be added. (default: :obj:`None`) Raises: Exception: If the tag creation fails after retries, an exception is raised with the error message. """ - - create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name}()' + create_tag_stmt = f"CREATE TAG IF NOT EXISTS {tag_name} ()" + if time_label is not None: + time_label = self._validate_time_label(time_label) + create_tag_stmt = f"""CREATE TAG IF NOT EXISTS {tag_name} + (time_label DATETIME DEFAULT {time_label})""" for attempt in range(MAX_RETRIES): res = self.query(create_tag_stmt) @@ -262,27 +278,39 @@ def add_node( self, node_id: str, tag_name: str, + time_label: Optional[str] = None, ) -> None: r"""Add a node with the specified tag and properties. Args: node_id (str): The ID of the node. tag_name (str): The tag name of the node. + time_label (str, optional): A specific timestamp to set for + the node's time label property. If not provided, no timestamp + will be added. (default: :obj:`None`) """ node_id = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', node_id) tag_name = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', tag_name) - self.ensure_tag_exists(tag_name) + self.ensure_tag_exists(tag_name, time_label) - # Insert node without properties - insert_stmt = ( - f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES "{node_id}":()' - ) + # Insert node with or without time_label property + if time_label is not None: + time_label = self._validate_time_label(time_label) + insert_stmt = ( + f'INSERT VERTEX IF NOT EXISTS {tag_name}(time_label) VALUES ' + f'"{node_id}":("{time_label}")' + ) + else: + insert_stmt = ( + f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES ' + f'"{node_id}":()' + ) for attempt in range(MAX_RETRIES): res = self.query(insert_stmt) if res.is_succeeded(): - return # Tag creation succeeded, exit the method + return # Node creation succeeded, exit the method if attempt < MAX_RETRIES - 1: time.sleep(RETRY_DELAY) @@ -348,7 +376,7 @@ def refresh_schema(self) -> None: @property def get_structured_schema(self) -> Dict[str, Any]: r"""Generates a structured schema consisting of node and relationship - properties, relationships, and metadata. + properties, relationships, and metadata, including timestamps. Returns: Dict[str, Any]: A dictionary representing the structured schema. @@ -419,6 +447,7 @@ def add_triplet( subj: str, obj: str, rel: str, + time_label: Optional[str] = None, ) -> None: r"""Adds a relationship (triplet) between two entities in the Nebula Graph database. @@ -427,6 +456,13 @@ def add_triplet( subj (str): The identifier for the subject entity. obj (str): The identifier for the object entity. rel (str): The relationship between the subject and object. + time_label (str, optional): A specific timestamp to set for the + time label property of the relationship. If not provided, + no timestamp will be added. (default: :obj:`None`) + + Raises: + ValueError: If the time_label format is invalid. + Exception: If creating the relationship fails. """ subj = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', subj) obj = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', obj) @@ -434,21 +470,30 @@ def add_triplet( self.ensure_tag_exists(subj) self.ensure_tag_exists(obj) - self.ensure_edge_type_exists(rel) + self.ensure_edge_type_exists(rel, time_label) self.add_node(node_id=subj, tag_name=subj) self.add_node(node_id=obj, tag_name=obj) - # Avoid latenicy + # Avoid latency time.sleep(1) - insert_stmt = ( - f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":();' - ) + # Create edge with or without time_label property + if time_label is not None: + time_label = self._validate_time_label(time_label) + insert_stmt = ( + f'INSERT EDGE IF NOT EXISTS {rel}(time_label) VALUES ' + f'"{subj}"->"{obj}":("{time_label}")' + ) + else: + insert_stmt = ( + f'INSERT EDGE IF NOT EXISTS {rel}() VALUES ' + f'"{subj}"->"{obj}":()' + ) res = self.query(insert_stmt) if not res.is_succeeded(): raise Exception( - f'create relationship `]{subj}` -> `{obj}`' + f'create relationship `{subj}` -> `{obj}`' + f'failed: {res.error_msg()}' ) @@ -568,3 +613,27 @@ def get_relationship_properties( ) return rel_schema_props, rel_structure_props + + def _validate_time_label(self, time_label: str) -> str: + r"""Validates the format of a time label string. + + Args: + time_label (str): The time label string to validate. + Should be in format 'YYYY-MM-DDThh:mm:ss'. + + Returns: + str: The validated time label. + + Raises: + ValueError: If the time label format is invalid. + """ + try: + # Check if the format matches YYYY-MM-DDThh:mm:ss + pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$' + if not re.match(pattern, time_label): + raise ValueError( + "Time label must be in format 'YYYY-MM-DDThh:mm:ss'" + ) + return time_label + except Exception as e: + raise ValueError(f"Invalid time label format: {e!s}") diff --git a/test/storages/graph_storages/test_nebula_graph.py b/test/storages/graph_storages/test_nebula_graph.py index e55e461a08..32c03cb24e 100644 --- a/test/storages/graph_storages/test_nebula_graph.py +++ b/test/storages/graph_storages/test_nebula_graph.py @@ -12,7 +12,7 @@ # limitations under the License. # ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. ========= import unittest -from unittest.mock import Mock, patch +from unittest.mock import Mock, call, patch from unstructured.documents.elements import Element @@ -106,7 +106,7 @@ def test_add_node(self): self.graph.add_node(node_id, tag_name) - self.graph.ensure_tag_exists.assert_called_with(tag_name) + self.graph.ensure_tag_exists.assert_has_calls([call(tag_name, None)]) insert_stmt = ( f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES "{node_id}":()' ) @@ -121,7 +121,7 @@ def test_ensure_tag_exists_success(self): self.graph.ensure_tag_exists(tag_name) - create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name}()' + create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name} ()' self.graph.query.assert_called_with(create_tag_stmt) @patch('time.sleep', return_value=None) @@ -153,13 +153,12 @@ def test_add_triplet(self): self.graph.add_triplet(subj, obj, rel) - self.graph.ensure_tag_exists.assert_any_call(subj) - self.graph.ensure_tag_exists.assert_any_call(obj) - self.graph.ensure_edge_type_exists.assert_called_with(rel) + self.graph.ensure_tag_exists.assert_has_calls([call(subj), call(obj)]) + self.graph.ensure_edge_type_exists.assert_has_calls([call(rel, None)]) self.graph.add_node.assert_any_call(node_id=subj, tag_name=subj) self.graph.add_node.assert_any_call(node_id=obj, tag_name=obj) insert_stmt = ( - f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":();' + f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":()' ) self.graph.query.assert_called_with(insert_stmt) @@ -401,24 +400,24 @@ def test_get_schema(self): def test_get_structured_schema(self): self.graph.get_node_properties = Mock( return_value=( - ['Node.prop'], - [{'labels': 'Node', 'properties': ['prop']}], + ['Person.name', 'Person.age'], + [{'labels': 'Person', 'properties': ['name', 'age']}], ) ) self.graph.get_relationship_properties = Mock( return_value=( - ['Rel.prop'], - [{'type': 'Rel', 'properties': ['prop']}], + ['KNOWS.since'], + [{'type': 'KNOWS', 'properties': ['since']}], ) ) - self.graph.get_relationship_types = Mock(return_value=['RELATES_TO']) - self.graph.get_indexes = Mock(return_value=['index1']) + self.graph.get_relationship_types = Mock(return_value=['KNOWS']) + self.graph.get_indexes = Mock(return_value=[]) structured_schema = self.graph.get_structured_schema expected_schema = { - "node_props": {'Node': ['prop']}, - "rel_props": {'Rel': ['prop']}, - "relationships": ['RELATES_TO'], - "metadata": {"index": ['index1']}, + "node_props": {"Person": ["name", "age"]}, + "rel_props": {"KNOWS": ["since"]}, + "relationships": ["KNOWS"], + "metadata": {"index": []}, } self.assertEqual(structured_schema, expected_schema) @@ -465,6 +464,64 @@ def test_add_graph_elements(self): 'node1', 'node2', 'RELATES_TO' ) + def test_validate_time_label_valid(self): + valid_time = "2024-12-31T21:45:22" + result = self.graph._validate_time_label(valid_time) + self.assertEqual(result, valid_time) + + def test_validate_time_label_none(self): + with self.assertRaises(ValueError): + self.graph._validate_time_label(None) + + def test_add_triplet_with_time_label(self): + subj = 'node1' + obj = 'node2' + rel = 'RELATESTO' + time_label = '2024-12-31T21:45:22' + + self.graph.ensure_tag_exists = Mock() + self.graph.ensure_edge_type_exists = Mock() + self.graph.add_node = Mock() + mock_result = Mock() + mock_result.is_succeeded.return_value = True + self.graph.query = Mock(return_value=mock_result) + + self.graph.add_triplet(subj, obj, rel, time_label) + + self.graph.ensure_tag_exists.assert_has_calls( + [call('node1'), call('node2')] + ) + self.graph.ensure_edge_type_exists.assert_called_with(rel, time_label) + self.graph.add_node.assert_any_call(node_id=subj, tag_name=subj) + self.graph.add_node.assert_any_call(node_id=obj, tag_name=obj) + + expected_stmt = ( + f'INSERT EDGE IF NOT EXISTS {rel}(time_label) VALUES ' + f'"{subj}"->"{obj}":("{time_label}")' + ) + self.graph.query.assert_called_with(expected_stmt) + + def test_add_triplet_with_invalid_time_label(self): + subj = 'node1' + obj = 'node2' + rel = 'RELATESTO' + invalid_time = '2024/12/31 21:45:22' # wrong format + + with self.assertRaises(ValueError) as context: + self.graph.add_triplet(subj, obj, rel, invalid_time) + + self.assertIn("Invalid time label format", str(context.exception)) + + def test_ensure_tag_exists_with_time_label(self): + tag_name = 'Tag1' + time_label = '2024-12-31T21:45:22' + + mock_result = Mock() + mock_result.is_succeeded.return_value = True + self.graph.query = Mock(return_value=mock_result) + + self.graph.ensure_tag_exists(tag_name, time_label) -if __name__ == '__main__': - unittest.main() + expected_stmt = f"""CREATE TAG IF NOT EXISTS {tag_name} + (time_label DATETIME DEFAULT {time_label})""" + self.graph.query.assert_called_with(expected_stmt)