Skip to content

Commit

Permalink
feat: nebula graph add time label (#1383)
Browse files Browse the repository at this point in the history
Co-authored-by: Wendong <[email protected]>
Co-authored-by: Wendong-Fan <[email protected]>
  • Loading branch information
3 people authored Jan 2, 2025
1 parent 3ea2b05 commit fdc727b
Show file tree
Hide file tree
Showing 2 changed files with 165 additions and 39 deletions.
109 changes: 89 additions & 20 deletions camel/storages/graph_storages/nebula_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import logging
import re
import time
from typing import TYPE_CHECKING, Any, Dict, List, Tuple
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple

from camel.storages.graph_storages.base import BaseGraphStorage
from camel.storages.graph_storages.graph_element import (
Expand Down Expand Up @@ -203,46 +203,62 @@ def add_graph_elements(
def ensure_edge_type_exists(
self,
edge_type: str,
time_label: Optional[str] = None,
) -> None:
r"""Ensures that a specified edge type exists in the NebulaGraph
database. If the edge type already exists, this method does nothing.
Args:
edge_type (str): The name of the edge type to be created.
time_label (str, optional): A specific timestamp to set as the
default value for the time label property. If not
provided, no timestamp will be added. (default: :obj:`None`)
Raises:
Exception: If the edge type creation fails after multiple retry
attempts, an exception is raised with the error message.
"""
create_edge_stmt = f'CREATE EDGE IF NOT EXISTS {edge_type}()'
create_edge_stmt = f"CREATE EDGE IF NOT EXISTS {edge_type} ()"
if time_label is not None:
time_label = self._validate_time_label(time_label)
create_edge_stmt = f"""CREATE EDGE IF NOT EXISTS {edge_type}
(time_label DATETIME DEFAULT {time_label})"""

for attempt in range(MAX_RETRIES):
res = self.query(create_edge_stmt)
if res.is_succeeded():
return # Tag creation succeeded, exit the method
return # Edge type creation succeeded

if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
else:
# Final attempt failed, raise an exception
raise Exception(
f"Failed to create tag `{edge_type}` after "
f"Failed to create edge type `{edge_type}` after "
f"{MAX_RETRIES} attempts: {res.error_msg()}"
)

def ensure_tag_exists(self, tag_name: str) -> None:
def ensure_tag_exists(
self, tag_name: str, time_label: Optional[str] = None
) -> None:
r"""Ensures a tag is created in the NebulaGraph database. If the tag
already exists, it does nothing.
Args:
tag_name (str): The name of the tag to be created.
time_label (str, optional): A specific timestamp to set as the
default value for the time label property. If not provided,
no timestamp will be added. (default: :obj:`None`)
Raises:
Exception: If the tag creation fails after retries, an exception
is raised with the error message.
"""

create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name}()'
create_tag_stmt = f"CREATE TAG IF NOT EXISTS {tag_name} ()"
if time_label is not None:
time_label = self._validate_time_label(time_label)
create_tag_stmt = f"""CREATE TAG IF NOT EXISTS {tag_name}
(time_label DATETIME DEFAULT {time_label})"""

for attempt in range(MAX_RETRIES):
res = self.query(create_tag_stmt)
Expand All @@ -262,27 +278,39 @@ def add_node(
self,
node_id: str,
tag_name: str,
time_label: Optional[str] = None,
) -> None:
r"""Add a node with the specified tag and properties.
Args:
node_id (str): The ID of the node.
tag_name (str): The tag name of the node.
time_label (str, optional): A specific timestamp to set for
the node's time label property. If not provided, no timestamp
will be added. (default: :obj:`None`)
"""
node_id = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', node_id)
tag_name = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', tag_name)

self.ensure_tag_exists(tag_name)
self.ensure_tag_exists(tag_name, time_label)

# Insert node without properties
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES "{node_id}":()'
)
# Insert node with or without time_label property
if time_label is not None:
time_label = self._validate_time_label(time_label)
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}(time_label) VALUES '
f'"{node_id}":("{time_label}")'
)
else:
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES '
f'"{node_id}":()'
)

for attempt in range(MAX_RETRIES):
res = self.query(insert_stmt)
if res.is_succeeded():
return # Tag creation succeeded, exit the method
return # Node creation succeeded, exit the method

if attempt < MAX_RETRIES - 1:
time.sleep(RETRY_DELAY)
Expand Down Expand Up @@ -348,7 +376,7 @@ def refresh_schema(self) -> None:
@property
def get_structured_schema(self) -> Dict[str, Any]:
r"""Generates a structured schema consisting of node and relationship
properties, relationships, and metadata.
properties, relationships, and metadata, including timestamps.
Returns:
Dict[str, Any]: A dictionary representing the structured schema.
Expand Down Expand Up @@ -419,6 +447,7 @@ def add_triplet(
subj: str,
obj: str,
rel: str,
time_label: Optional[str] = None,
) -> None:
r"""Adds a relationship (triplet) between two entities in the Nebula
Graph database.
Expand All @@ -427,28 +456,44 @@ def add_triplet(
subj (str): The identifier for the subject entity.
obj (str): The identifier for the object entity.
rel (str): The relationship between the subject and object.
time_label (str, optional): A specific timestamp to set for the
time label property of the relationship. If not provided,
no timestamp will be added. (default: :obj:`None`)
Raises:
ValueError: If the time_label format is invalid.
Exception: If creating the relationship fails.
"""
subj = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', subj)
obj = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', obj)
rel = re.sub(r'[^a-zA-Z0-9\u4e00-\u9fa5]', '', rel)

self.ensure_tag_exists(subj)
self.ensure_tag_exists(obj)
self.ensure_edge_type_exists(rel)
self.ensure_edge_type_exists(rel, time_label)
self.add_node(node_id=subj, tag_name=subj)
self.add_node(node_id=obj, tag_name=obj)

# Avoid latenicy
# Avoid latency
time.sleep(1)

insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":();'
)
# Create edge with or without time_label property
if time_label is not None:
time_label = self._validate_time_label(time_label)
insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}(time_label) VALUES '
f'"{subj}"->"{obj}":("{time_label}")'
)
else:
insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES '
f'"{subj}"->"{obj}":()'
)

res = self.query(insert_stmt)
if not res.is_succeeded():
raise Exception(
f'create relationship `]{subj}` -> `{obj}`'
f'create relationship `{subj}` -> `{obj}`'
+ f'failed: {res.error_msg()}'
)

Expand Down Expand Up @@ -568,3 +613,27 @@ def get_relationship_properties(
)

return rel_schema_props, rel_structure_props

def _validate_time_label(self, time_label: str) -> str:
r"""Validates the format of a time label string.
Args:
time_label (str): The time label string to validate.
Should be in format 'YYYY-MM-DDThh:mm:ss'.
Returns:
str: The validated time label.
Raises:
ValueError: If the time label format is invalid.
"""
try:
# Check if the format matches YYYY-MM-DDThh:mm:ss
pattern = r'^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}$'
if not re.match(pattern, time_label):
raise ValueError(
"Time label must be in format 'YYYY-MM-DDThh:mm:ss'"
)
return time_label
except Exception as e:
raise ValueError(f"Invalid time label format: {e!s}")
95 changes: 76 additions & 19 deletions test/storages/graph_storages/test_nebula_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# limitations under the License.
# ========= Copyright 2023-2024 @ CAMEL-AI.org. All Rights Reserved. =========
import unittest
from unittest.mock import Mock, patch
from unittest.mock import Mock, call, patch

from unstructured.documents.elements import Element

Expand Down Expand Up @@ -106,7 +106,7 @@ def test_add_node(self):

self.graph.add_node(node_id, tag_name)

self.graph.ensure_tag_exists.assert_called_with(tag_name)
self.graph.ensure_tag_exists.assert_has_calls([call(tag_name, None)])
insert_stmt = (
f'INSERT VERTEX IF NOT EXISTS {tag_name}() VALUES "{node_id}":()'
)
Expand All @@ -121,7 +121,7 @@ def test_ensure_tag_exists_success(self):

self.graph.ensure_tag_exists(tag_name)

create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name}()'
create_tag_stmt = f'CREATE TAG IF NOT EXISTS {tag_name} ()'
self.graph.query.assert_called_with(create_tag_stmt)

@patch('time.sleep', return_value=None)
Expand Down Expand Up @@ -153,13 +153,12 @@ def test_add_triplet(self):

self.graph.add_triplet(subj, obj, rel)

self.graph.ensure_tag_exists.assert_any_call(subj)
self.graph.ensure_tag_exists.assert_any_call(obj)
self.graph.ensure_edge_type_exists.assert_called_with(rel)
self.graph.ensure_tag_exists.assert_has_calls([call(subj), call(obj)])
self.graph.ensure_edge_type_exists.assert_has_calls([call(rel, None)])
self.graph.add_node.assert_any_call(node_id=subj, tag_name=subj)
self.graph.add_node.assert_any_call(node_id=obj, tag_name=obj)
insert_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":();'
f'INSERT EDGE IF NOT EXISTS {rel}() VALUES "{subj}"->"{obj}":()'
)
self.graph.query.assert_called_with(insert_stmt)

Expand Down Expand Up @@ -401,24 +400,24 @@ def test_get_schema(self):
def test_get_structured_schema(self):
self.graph.get_node_properties = Mock(
return_value=(
['Node.prop'],
[{'labels': 'Node', 'properties': ['prop']}],
['Person.name', 'Person.age'],
[{'labels': 'Person', 'properties': ['name', 'age']}],
)
)
self.graph.get_relationship_properties = Mock(
return_value=(
['Rel.prop'],
[{'type': 'Rel', 'properties': ['prop']}],
['KNOWS.since'],
[{'type': 'KNOWS', 'properties': ['since']}],
)
)
self.graph.get_relationship_types = Mock(return_value=['RELATES_TO'])
self.graph.get_indexes = Mock(return_value=['index1'])
self.graph.get_relationship_types = Mock(return_value=['KNOWS'])
self.graph.get_indexes = Mock(return_value=[])
structured_schema = self.graph.get_structured_schema
expected_schema = {
"node_props": {'Node': ['prop']},
"rel_props": {'Rel': ['prop']},
"relationships": ['RELATES_TO'],
"metadata": {"index": ['index1']},
"node_props": {"Person": ["name", "age"]},
"rel_props": {"KNOWS": ["since"]},
"relationships": ["KNOWS"],
"metadata": {"index": []},
}
self.assertEqual(structured_schema, expected_schema)

Expand Down Expand Up @@ -465,6 +464,64 @@ def test_add_graph_elements(self):
'node1', 'node2', 'RELATES_TO'
)

def test_validate_time_label_valid(self):
valid_time = "2024-12-31T21:45:22"
result = self.graph._validate_time_label(valid_time)
self.assertEqual(result, valid_time)

def test_validate_time_label_none(self):
with self.assertRaises(ValueError):
self.graph._validate_time_label(None)

def test_add_triplet_with_time_label(self):
subj = 'node1'
obj = 'node2'
rel = 'RELATESTO'
time_label = '2024-12-31T21:45:22'

self.graph.ensure_tag_exists = Mock()
self.graph.ensure_edge_type_exists = Mock()
self.graph.add_node = Mock()
mock_result = Mock()
mock_result.is_succeeded.return_value = True
self.graph.query = Mock(return_value=mock_result)

self.graph.add_triplet(subj, obj, rel, time_label)

self.graph.ensure_tag_exists.assert_has_calls(
[call('node1'), call('node2')]
)
self.graph.ensure_edge_type_exists.assert_called_with(rel, time_label)
self.graph.add_node.assert_any_call(node_id=subj, tag_name=subj)
self.graph.add_node.assert_any_call(node_id=obj, tag_name=obj)

expected_stmt = (
f'INSERT EDGE IF NOT EXISTS {rel}(time_label) VALUES '
f'"{subj}"->"{obj}":("{time_label}")'
)
self.graph.query.assert_called_with(expected_stmt)

def test_add_triplet_with_invalid_time_label(self):
subj = 'node1'
obj = 'node2'
rel = 'RELATESTO'
invalid_time = '2024/12/31 21:45:22' # wrong format

with self.assertRaises(ValueError) as context:
self.graph.add_triplet(subj, obj, rel, invalid_time)

self.assertIn("Invalid time label format", str(context.exception))

def test_ensure_tag_exists_with_time_label(self):
tag_name = 'Tag1'
time_label = '2024-12-31T21:45:22'

mock_result = Mock()
mock_result.is_succeeded.return_value = True
self.graph.query = Mock(return_value=mock_result)

self.graph.ensure_tag_exists(tag_name, time_label)

if __name__ == '__main__':
unittest.main()
expected_stmt = f"""CREATE TAG IF NOT EXISTS {tag_name}
(time_label DATETIME DEFAULT {time_label})"""
self.graph.query.assert_called_with(expected_stmt)

0 comments on commit fdc727b

Please sign in to comment.