Skip to content

Commit fcacf55

Browse files
authored
Batch inserts (#56)
* feat: add batch mode for improved bulk insert performance Add batch mode to defer flush() operations during bulk inserts, significantly improving performance when inserting large graphs. * docs: update README, add benchmarks and roadmap, bump to v3.1.0 * ci: add GitHub Actions workflow for PR tests * fix: use deterministic keys in test_indexer to avoid flaky hash collisions * fix: use deterministic keys in test_index to avoid flaky hash collisions
1 parent 19f79c4 commit fcacf55

File tree

11 files changed

+832
-10
lines changed

11 files changed

+832
-10
lines changed

.github/workflows/python-tests.yml

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
# This workflow runs Python tests on pull requests and pushes
2+
name: Python Tests
3+
4+
on:
5+
push:
6+
branches: [ master, main ]
7+
pull_request:
8+
branches: [ master, main ]
9+
10+
jobs:
11+
test:
12+
runs-on: ubuntu-latest
13+
strategy:
14+
matrix:
15+
python-version: ['3.9', '3.10', '3.11', '3.12']
16+
17+
steps:
18+
- uses: actions/checkout@v4
19+
20+
- name: Set up Python ${{ matrix.python-version }}
21+
uses: actions/setup-python@v5
22+
with:
23+
python-version: ${{ matrix.python-version }}
24+
25+
- name: Install dependencies
26+
run: |
27+
python -m pip install --upgrade pip
28+
pip install xxhash==3.2.0
29+
pip install pytest
30+
31+
- name: Run tests
32+
run: |
33+
python -m pytest test/ -v --ignore=test/bench.py --ignore=test/benchmark.py
34+
35+
- name: Run quick benchmark (optional)
36+
run: |
37+
python test/benchmark.py --quick --skip-individual
38+
continue-on-error: true

PERFORMANCE_ROADMAP.md

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# CogDB Performance Roadmap
2+
3+
> Weekly release cadence - tracking performance issues and optimizations
4+
5+
## 🔴 High Priority
6+
7+
### Star Graph / High-Degree Vertex Performance Degradation
8+
**Discovered:** 2024-12-10 via benchmark
9+
**Issue:** When inserting many edges from/to the same vertex, performance degrades severely.
10+
11+
| Edges | Speed | Degradation |
12+
|-------|-------|-------------|
13+
| 100 | 569 edges/s | baseline |
14+
| 500 | 633 edges/s | - |
15+
| 1,000 | 338 edges/s | 47% slower |
16+
| 5,000 | 83 edges/s | **87% slower** |
17+
18+
**Root Cause:** `put_set()` in `database.py` traverses linked lists to check for duplicates. This is O(n) per insert, making high-degree vertices O(n²) overall.
19+
20+
**Location:** `database.py:241-277` (put_set method)
21+
22+
**Potential Fix:**
23+
1. Use hash-based set for duplicate checking instead of linked list traversal
24+
2. Consider bloom filter for faster "definitely not present" checks
25+
3. Or maintain an in-memory index of vertex adjacencies
26+
27+
---
28+
29+
## 🟡 Medium Priority
30+
31+
### Unbounded Cache Growth
32+
**Issue:** Cache in `cache.py` grows unboundedly - no eviction policy.
33+
**Fix:** Implement LRU cache with `collections.OrderedDict`
34+
**Effort:** Low
35+
36+
### Redundant Table Switches in put_node
37+
**Issue:** `put_node()` calls `use_table()` 5 times per edge insert.
38+
**Fix:** Cache table references within the method
39+
**Effort:** Low
40+
41+
---
42+
43+
## 🟢 Low Priority / Nice to Have
44+
45+
### Efficient Serialization
46+
**Issue:** Record.marshal() uses string concatenation with `+`
47+
**Fix:** Use bytearray for efficient concatenation
48+
**Effort:** Low, ~5% improvement
49+
50+
### Configurable Auto-Flush
51+
**Issue:** Currently binary (batch mode on/off)
52+
**Fix:** Add config option for flush frequency (every N records)
53+
**Effort:** Low
54+
55+
---
56+
57+
## ✅ Completed
58+
59+
### v3.1.0 (2024-12-10)
60+
- [x] Batch flush mode - defer flush() during bulk inserts
61+
- [x] `Graph.put_batch()` method for efficient bulk loading
62+
- [x] Comprehensive benchmark suite (`test/benchmark.py`)
63+
- [x] ~1.6x speedup for large batch inserts
64+
65+
---
66+
67+
## Benchmark Baselines (v3.1.0)
68+
69+
```
70+
Chain graph (batch, 5000 edges): 4,377 edges/s
71+
Social graph (12,492 edges): 3,233 edges/s
72+
Dense graph (985 edges): 2,585 edges/s
73+
Read performance: 20,000+ ops/s
74+
```
75+
76+
Run benchmarks: `python3 test/benchmark.py`

README.md

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
# CogDB - Micro Graph Database for Python Applications
66
> Documents and examples at [cogdb.io](https://cogdb.io)
77
8-
> New release: 3.0.5
8+
> New release: 3.1.0
9+
> - **Batch insert mode** for significantly faster bulk graph loading
10+
> - New `put_batch()` method for efficient triple insertion
11+
> - Performance improvements: up to 1.6x faster inserts at scale
912
> - New word embeddings API
1013
> - Similarity filtering using word embeddings
1114
> - Filter step
@@ -45,6 +48,22 @@ g.put("alice","score","7")
4548
g.put("dani","score","100")
4649
```
4750

51+
#### Using `put_batch` for bulk inserts (faster)
52+
53+
```python
54+
from cog.torque import Graph
55+
g = Graph("people")
56+
57+
# Insert multiple triples at once - significantly faster for large graphs
58+
g.put_batch([
59+
("alice", "follows", "bob"),
60+
("bob", "follows", "charlie"),
61+
("charlie", "follows", "alice"),
62+
("alice", "likes", "pizza"),
63+
("bob", "likes", "tacos"),
64+
])
65+
```
66+
4867
#### Drop Edge ###
4968

5069
```python
@@ -292,4 +311,19 @@ for r in scanner:
292311

293312
## Benchmark
294313

295-
# ![Put Perf](notes/bench.png)
314+
![Put Perf](notes/bench.png)
315+
316+
### Performance Results
317+
318+
Run benchmarks with: `python3 test/benchmark.py`
319+
320+
| Graph Type | Edges | Speed (edges/s) |
321+
|------------|-------|----------------|
322+
| Chain (batch) | 5,000 | 4,377 |
323+
| Social network | 12,492 | 3,233 |
324+
| Dense graph | 985 | 2,585 |
325+
| Chain (individual) | 5,000 | 2,712 |
326+
327+
**Batch vs Individual Insert:**
328+
- 1.6x faster at 5,000 edges
329+
- Read performance: 20,000+ ops/second

cog/core.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ class Store:
367367

368368
def __init__(self, tablemeta, config, logger, caching_enabled=True, shared_cache=None):
369369
self.caching_enabled = caching_enabled
370+
self.batch_mode = False # When True, defers flush() until end_batch()
370371
self.logger = logging.getLogger('store')
371372
self.tablemeta = tablemeta
372373
self.config = config
@@ -380,8 +381,24 @@ def __init__(self, tablemeta, config, logger, caching_enabled=True, shared_cache
380381
logger.info("Store for file init: " + self.store)
381382

382383
def close(self):
384+
if self.batch_mode:
385+
self.store_file.flush() # Ensure pending writes are flushed on close
383386
self.store_file.close()
384387

388+
def begin_batch(self):
389+
"""
390+
Enable batch mode - defers flush() until end_batch() is called.
391+
Use this when inserting many records for significantly better performance.
392+
"""
393+
self.batch_mode = True
394+
395+
def end_batch(self):
396+
"""
397+
End batch mode and flush all pending writes to disk.
398+
"""
399+
self.store_file.flush()
400+
self.batch_mode = False
401+
385402
def save(self, record):
386403
"""
387404
Store data
@@ -391,7 +408,8 @@ def save(self, record):
391408
record.set_store_position(store_position)
392409
marshalled_record = record.marshal()
393410
self.store_file.write(marshalled_record)
394-
self.store_file.flush()
411+
if not self.batch_mode:
412+
self.store_file.flush()
395413
if self.caching_enabled:
396414
self.store_cache.put(store_position, marshalled_record)
397415
return store_position
@@ -408,7 +426,8 @@ def update_record_link_inplace(self, start_pos, int_value):
408426

409427
if self.caching_enabled:
410428
self.store_cache.partial_update_from_zero_index(start_pos, byte_value)
411-
self.store_file.flush()
429+
if not self.batch_mode:
430+
self.store_file.flush()
412431

413432
# @profile
414433
def read(self, position):

cog/database.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,25 @@ def print_cache_info(self):
182182
print("::: cache info ::: {}, {}, {}".format(self.current_namespace, self.current_table.table_meta.name,
183183
str(self.current_table.store.store_cache.size_list())))
184184

185+
def begin_batch(self):
186+
"""
187+
Enable batch mode on all tables in the current namespace.
188+
Defers flush() until end_batch() is called for better bulk insert performance.
189+
"""
190+
if self.current_namespace in self.namespaces and self.namespaces[self.current_namespace]:
191+
for table in self.namespaces[self.current_namespace].values():
192+
if table:
193+
table.store.begin_batch()
194+
195+
def end_batch(self):
196+
"""
197+
End batch mode and flush all pending writes to disk.
198+
"""
199+
if self.current_namespace in self.namespaces and self.namespaces[self.current_namespace]:
200+
for table in self.namespaces[self.current_namespace].values():
201+
if table:
202+
table.store.end_batch()
203+
185204
def close(self):
186205
for name, space in self.namespaces.items():
187206
if space is None:

cog/torque.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,32 @@ def put(self, vertex1, predicate, vertex2, update=False, create_new_edge=False):
221221
self.all_predicates = self.cog.list_tables()
222222
return self
223223

224+
def put_batch(self, triples):
225+
"""
226+
Insert multiple triples efficiently using batch mode.
227+
Significantly faster than calling put() in a loop for large datasets.
228+
229+
:param triples: List of (vertex1, predicate, vertex2) tuples
230+
:return: self for method chaining
231+
232+
Example:
233+
g.put_batch([
234+
("alice", "follows", "bob"),
235+
("bob", "follows", "charlie"),
236+
("charlie", "follows", "alice")
237+
])
238+
"""
239+
self.cog.use_namespace(self.graph_name)
240+
self.cog.begin_batch()
241+
try:
242+
for v1, pred, v2 in triples:
243+
self.cog.use_table(pred)
244+
self.cog.put_node(v1, pred, v2)
245+
finally:
246+
self.cog.end_batch()
247+
self.all_predicates = self.cog.list_tables()
248+
return self
249+
224250
def drop(self, vertex1, predicate, vertex2):
225251
"""
226252
Drops edge between vertex1 and vertex2 for the given predicate.

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33

44
setup(name='cogdb',
5-
version='3.0.9',
5+
version='3.1.0',
66
description='Persistent Embedded Graph Database',
77
url='http://github.com/arun1729/cog',
88
author='Arun Mahendra',

0 commit comments

Comments
 (0)