Skip to content

Commit

Permalink
Allow opening a RW transaction inside a RO one.
Browse files Browse the repository at this point in the history
  • Loading branch information
scossu committed Mar 28, 2019
1 parent 4abca97 commit 11b18a0
Show file tree
Hide file tree
Showing 6 changed files with 159 additions and 42 deletions.
46 changes: 41 additions & 5 deletions lakesuperior/store/base_lmdb_store.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -344,22 +344,46 @@ cdef class BaseLmdbStore:
"""
Transaction context manager.
Open and close a transaction for the duration of the functions in the
context. If a transaction has already been opened in the store, a new
one is opened only if the current transaction is read-only and the new
requested transaction is read-write.
If a new write transaction is opened, the old one is kept on hold until
the new transaction is closed, then restored. All cursors are
invalidated and must be restored as well if one needs to reuse them.
:param bool write: Whether a write transaction is to be opened.
:rtype: lmdb.Transaction
"""
cdef lmdb.MDB_txn* hold_txn

will_open = False

if not self.is_open:
raise LmdbError('Store is not open.')

# If another transaction is open, only open the new transaction if
# the current one is RO and the new one RW.
if self.is_txn_open:
logger.debug(
'Transaction is already active. Not opening another one.')
#logger.debug('before yield')
yield
#logger.debug('after yield')
if write:
will_open = not self.is_txn_rw
else:
will_open = True

# If a new transaction needs to be opened and replace the old one,
# the old one must be put on hold and swapped out when the new txn
# is closed.
if will_open:
will_reset = self.is_txn_open

if will_open:
#logger.debug('Beginning {} transaction.'.format(
# 'RW' if write else 'RO'))
if will_reset:
hold_txn = self.txn

try:
self._txn_begin(write=write)
self.is_txn_rw = write
Expand All @@ -368,9 +392,21 @@ cdef class BaseLmdbStore:
#logger.debug('In txn_ctx, after yield')
self._txn_commit()
#logger.debug('after _txn_commit')
if will_reset:
lmdb.mdb_txn_reset(hold_txn)
self.txn = hold_txn
_check(lmdb.mdb_txn_renew(self.txn))
self.is_txn_rw = False
except:
self._txn_abort()
raise
else:
logger.info(
'Transaction is already active. Not opening another one.'
)
#logger.debug('before yield')
yield
#logger.debug('after yield')


def begin(self, write=False):
Expand Down
17 changes: 0 additions & 17 deletions lakesuperior/store/ldp_rs/lmdb_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,20 +199,3 @@ def remove_graph(self, graph):


## PRIVATE METHODS ##

def _normalize_context(self, context):
"""
Normalize a context parameter to conform to the model expectations.
:param context: Context URI or graph.
:type context: URIRef or Graph or None
"""
if isinstance(context, Graph):
if context == self or isinstance(context.identifier, Variable):
context = None
else:
context = context.identifier
elif isinstance(context, str):
context = URIRef(context)

return context
23 changes: 21 additions & 2 deletions lakesuperior/store/ldp_rs/lmdb_triplestore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -288,8 +288,7 @@ cdef class LmdbTriplestore(BaseLmdbStore):
Buffer _sc
Key ck

if isinstance(c, rdflib.Graph):
c = c.identifier
c = self._normalize_context(c)

ck = self.to_key(c)
if not self._key_exists(<unsigned char*>&ck, KLEN, b'c:'):
Expand Down Expand Up @@ -1332,3 +1331,23 @@ cdef class LmdbTriplestore(BaseLmdbStore):
flags | lmdb.MDB_APPEND)

return new_idx


def _normalize_context(self, context):
"""
Normalize a context parameter to conform to the model expectations.
:param context: Context URI or graph.
:type context: URIRef or Graph or None
"""
if isinstance(context, rdflib.Graph):
if context == self or isinstance(
context.identifier, rdflib.Variable
):
context = None
else:
context = context.identifier
elif isinstance(context, str):
context = rdflib.URIRef(context)

return context
15 changes: 15 additions & 0 deletions sandbox/NOTES
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
Uses for a graph:

1. Create a graph from RDF input, manipulate or evaluate it, and output it as
serialized RDF (always detached) [NO USE CASE]
2. Create a graph from RDF input, optionally manipulate it with other data from
the store or external RDF and store it (start detached, then convert keys;
or, start attached)
3. Retrieve a graph from the store, optionally manipulate it, and output it as
serialized RDF (start attached, then detach)
4. Retrieve a graph from the store, manipulate it, and put the changed graph
back in the store (always attached)

Initially we might try to render the graph read-only when detached; this
avoids implementing more complex operations such as add, remove and booleans.

10 changes: 10 additions & 0 deletions sandbox/txn_openLogic.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
txn_open write txn_rw Open?
n - - y
y n - n
y y y n
y y n y

txn_open Open Reset?
n y n
y y y

90 changes: 72 additions & 18 deletions tests/1_store/test_lmdb_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from rdflib.graph import DATASET_DEFAULT_GRAPH_ID as RDFLIB_DEFAULT_GRAPH_URI
from rdflib.namespace import RDF, RDFS

from lakesuperior.store.ldp_rs.lmdb_store import LmdbStore
from lakesuperior.model.rdf.graph import Graph
from lakesuperior.store.base_lmdb_store import LmdbError
from lakesuperior.store.ldp_rs.lmdb_store import LmdbStore


@pytest.fixture(scope='class')
Expand Down Expand Up @@ -69,6 +70,12 @@ def test_open_close(self):
assert not path.exists(env_path + '-lock')



@pytest.mark.usefixtures('store')
class TestTransactionContext:
'''
Tests for intializing and shutting down store and transactions.
'''
def test_txn(self, store):
'''
Test opening and closing the main transaction.
Expand Down Expand Up @@ -108,20 +115,80 @@ def test_rollback(self, store):
'''
Test rolling back a transaction.
'''
trp = (
URIRef('urn:nogo:s'), URIRef('urn:nogo:p'), URIRef('urn:nogo:o')
)
try:
with store.txn_ctx(True):
store.add((
URIRef('urn:nogo:s'), URIRef('urn:nogo:p'),
URIRef('urn:nogo:o')))
store.add(trp)
raise RuntimeError() # This should roll back the transaction.
except RuntimeError:
pass

with store.txn_ctx():
res = set(store.triples((None, None, None)))
res = set(store.triples(trp))
assert len(res) == 0


def test_nested_ro_txn(self, store):
"""
Test two nested RO transactions.
"""
trp = (URIRef('urn:s:0'), URIRef('urn:p:0'), URIRef('urn:o:0'))
with store.txn_ctx(True):
store.add(trp)
with store.txn_ctx():
with store.txn_ctx():
res = {*store.triples(trp)}
assert trp in {q[0] for q in res}
assert trp in {q[0] for q in res}


def test_nested_ro_txn_nowrite(self, store):
"""
Test two nested RO transactions.
"""
trp = (URIRef('urn:s:0'), URIRef('urn:p:0'), URIRef('urn:o:0'))
with pytest.raises(LmdbError):
with store.txn_ctx():
with store.txn_ctx():
store.add(trp)


def test_nested_ro_rw_txn(self, store):
"""
Test a RO transaction nested into a RW one.
"""
trp = (URIRef('urn:s:1'), URIRef('urn:p:1'), URIRef('urn:o:1'))
with store.txn_ctx():
with store.txn_ctx(True):
store.add(trp)
# Outer txn should now see the new triple.
assert trp in {q[0] for q in store.triples(trp)}


def test_nested_rw_ro_txn(self, store):
"""
Test that a RO transaction nested in a RW transaction can write.
"""
trp = (URIRef('urn:s:2'), URIRef('urn:p:2'), URIRef('urn:o:2'))
with store.txn_ctx(True):
with store.txn_ctx():
store.add(trp)
assert trp in {q[0] for q in store.triples(trp)}


def test_nested_rw_rw_txn(self, store):
"""
Test that a RW transaction nested in a RW transaction can write.
"""
trp = (URIRef('urn:s:3'), URIRef('urn:p:3'), URIRef('urn:o:3'))
with store.txn_ctx(True):
with store.txn_ctx():
store.add(trp)
assert trp in {q[0] for q in store.triples(trp)}


@pytest.mark.usefixtures('store')
class TestBasicOps:
'''
Expand Down Expand Up @@ -863,19 +930,6 @@ def test_remove_shared_ctx(self, store):
assert len(set(store.triples(trp3))) == 1






@pytest.mark.usefixtures('store')
class TestTransactions:
'''
Tests for transaction handling.
'''
# @TODO Test concurrent reads and writes.
pass


#@pytest.mark.usefixtures('store')
#class TestRdflib:
# '''
Expand Down

0 comments on commit 11b18a0

Please sign in to comment.