Skip to content

Commit

Permalink
[ issue #10 ] Integration test framework improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
agazzarini committed Aug 31, 2014
1 parent 04c2836 commit 7ff0cb7
Show file tree
Hide file tree
Showing 32 changed files with 399 additions and 304 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* @since 1.0
*/
public class CassandraClientShutdownHook implements ClientShutdownHook {
private final static Log LOGGER = new Log(LoggerFactory.getLogger(ClientShutdownHook.class));
private static final Log LOGGER = new Log(LoggerFactory.getLogger(ClientShutdownHook.class));
private final Session session;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.gazzax.labs.jena.nosql.fwk.configuration.Configuration;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.dictionary.node.TransientNodeDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.factory.ClientShutdownHook;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;

Expand Down Expand Up @@ -51,7 +51,8 @@ public class CassandraStorageLayerFactory extends StorageLayerFactory {
private Session session;
private Cluster cluster;
private TopLevelDictionary dictionary;

private int deletionBatchSize;

@Override
public <K, V> MapDAO<K, V> getMapDAO(
final Class<K> keyClass,
Expand All @@ -77,12 +78,12 @@ public <K, V> MapDAO<K, V> getMapDAO(

@Override
public GraphDAO<byte[][], byte[][]> getGraphDAO(final Node name) {
return new CassandraTripleIndexDAO(session, dictionary);
return new CassandraTripleIndexDAO(session, deletionBatchSize);
}

@Override
public GraphDAO<byte[][], byte[][]> getGraphDAO() {
return new CassandraTripleIndexDAO(session, dictionary);
return new CassandraTripleIndexDAO(session, deletionBatchSize);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
import java.util.List;

import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;

import com.datastax.driver.core.BatchStatement;
Expand Down Expand Up @@ -56,17 +55,20 @@ protected BatchStatement initialValue() {

private PreparedStatement[] queries;

protected final TopLevelDictionary dictionary;
private int deletionBatchSize;

/**
* Buils a new {@link CassandraTripleIndexDAO} with the given data.
*
* @param deletionBatchSize the batch size used in deletions.
* @param session The connection to Cassandra.
* @param dictionary the dictionary currently used.
*/
public CassandraTripleIndexDAO(final Session session, final TopLevelDictionary dictionary) {
public CassandraTripleIndexDAO(
final Session session,
final int deletionBatchSize) {
this.session = session;
this.dictionary = dictionary;
this.deletionBatchSize = deletionBatchSize;

prepareStatements();
}

Expand Down Expand Up @@ -124,14 +126,12 @@ public void deleteTriple(final byte[][] ids) throws StorageLayerException {
}

@Override
public List<byte[][]> deleteTriples(
final Iterator<byte[][]> nodes,
final int batchSize) throws StorageLayerException {
public List<byte[][]> deleteTriples(final Iterator<byte[][]> nodes) throws StorageLayerException {

final List<byte[][]> deleted = new ArrayList<byte[][]>(batchSize);
final List<byte[][]> deleted = new ArrayList<byte[][]>(deletionBatchSize);

while (nodes.hasNext()) {
for (int i = 0; i < batchSize && nodes.hasNext(); i++) {
for (int i = 0; i < deletionBatchSize && nodes.hasNext(); i++) {

byte[][] ids = nodes.next();
if (ids == null || ids.length < 3) {
Expand Down Expand Up @@ -176,11 +176,17 @@ public Iterator<byte[][]> query(final byte[][] query) throws StorageLayerExcepti
return new AbstractIterator<byte[][]>() {
@Override
protected byte[][] computeNext() {
return iterator.hasNext()? asByteArray(iterator.next()) : endOfData();
return iterator.hasNext() ? asByteArray(iterator.next()) : endOfData();
}
};
}

/**
* Transforms the given row in a byte array containing term identifiers.
*
* @param row the row.
* @return a byte array containing term identifiers.
*/
private byte[][] asByteArray(final Row row) {
final byte[] s = Bytes.getArray(row.getBytesUnsafe(0));
final byte[] p = Bytes.getArray(row.getBytesUnsafe(1));
Expand Down Expand Up @@ -272,4 +278,10 @@ void internalDelete(final byte [][]ids) throws StorageLayerException {

batchStatements.get().add(ospcStatement);
}

@Override
public long countTriples() throws StorageLayerException {
// TODO Auto-generated method stub
return 0;
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,15 @@
package org.gazzax.labs.jena.nosql.solr;

/**
* Enumerative interface for field names used in SOLR schema.
*
* @author Andrea Gazzarini
* @since 1.0
*/
public interface Field {
String ID = "id";
String S = "s";
String P = "p";
String O = "o";
String C = "c";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import java.util.Iterator;

import org.gazzax.labs.jena.nosql.fwk.StorageLayerException;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;

Expand All @@ -24,19 +23,19 @@ public void close() {
}

@Override
public byte[] getID(Node node, boolean p) throws StorageLayerException {
public byte[] getID(final Node node, final boolean p) {
// Nothing to be done here...
return null;
}

@Override
public Node getValue(byte[] id, boolean p) throws StorageLayerException {
public Node getValue(final byte[] id, final boolean p) {
// Nothing to be done here...
return null;
}

@Override
public void removeValue(Node value, boolean p) throws StorageLayerException {
public void removeValue(final Node value, final boolean p) {
// Nothing to be done here...
}

Expand Down Expand Up @@ -134,4 +133,4 @@ public byte[][] decompose(final byte[] compositeId) {
// Nothing to be done here...
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* @since 1.0
*/
public class SolrClientShutdownHook implements ClientShutdownHook {
private final static Log LOGGER = new Log(LoggerFactory.getLogger(ClientShutdownHook.class));
private static final Log LOGGER = new Log(LoggerFactory.getLogger(ClientShutdownHook.class));
private final SolrServer connection;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
import org.apache.solr.client.solrj.impl.HttpSolrServer;
import org.gazzax.labs.jena.nosql.fwk.configuration.Configuration;
import org.gazzax.labs.jena.nosql.fwk.dictionary.TopLevelDictionary;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.GraphDAO;
import org.gazzax.labs.jena.nosql.fwk.ds.MapDAO;
import org.gazzax.labs.jena.nosql.fwk.factory.ClientShutdownHook;
import org.gazzax.labs.jena.nosql.fwk.factory.StorageLayerFactory;
import org.gazzax.labs.jena.nosql.solr.dao.SolrGraphDAO;
Expand All @@ -26,12 +26,17 @@
*/
public class SolrStorageLayerFactory extends StorageLayerFactory {
private final TopLevelDictionary dictionary = new NoOpDictionary();

private SolrServer solr;

private int addCommitWithinMsecs;
private int deleteCommitWithinMsecs;

@Override
public void accept(final Configuration<Map<String, Object>> configuration) {
deletionBatchSize = configuration.getParameter("delete-batch-size", Integer.valueOf(1000));

addCommitWithinMsecs = configuration.getParameter("add-commit-within-msecs", Integer.valueOf(1));
deleteCommitWithinMsecs = configuration.getParameter("delete-commit-within-msecs", Integer.valueOf(1));

final String address = configuration.getParameter("solr-address", "http://127.0.0.1:8080/solr/store");
try {
solr = (SolrServer) Class.forName(configuration.getParameter("solr-server-class", HttpSolrServer.class.getName()))
Expand All @@ -54,22 +59,22 @@ public <K, V> MapDAO<K, V> getMapDAO(

@Override
public Graph getGraph() {
return new SolrGraph(this, deletionBatchSize);
return new SolrGraph(this);
}

@Override
public Graph getGraph(Node graphNode) {
return new SolrGraph(graphNode, this, deletionBatchSize);
public Graph getGraph(final Node graphNode) {
return new SolrGraph(graphNode, this);
}

@Override
public GraphDAO<Triple, TripleMatch> getGraphDAO(final Node name) {
return new SolrGraphDAO(solr, name);
return new SolrGraphDAO(solr, name, addCommitWithinMsecs, deleteCommitWithinMsecs);
}

@Override
public GraphDAO<Triple, TripleMatch> getGraphDAO() {
return new SolrGraphDAO(solr);
return new SolrGraphDAO(solr, addCommitWithinMsecs, deleteCommitWithinMsecs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ public class SolrDeepPagingIterator extends UnmodifiableIterator<Triple> {
public boolean hasNext() {
try {
final QueryResponse response = solr.query(query);
sentCursorMark = query.get(CursorMarkParams.CURSOR_MARK_PARAM);
nextCursorMark = response.getNextCursorMark();
page = response.getResults();
return page.getNumFound() > 0;
return !page.isEmpty();
} catch (final Exception exception) {
throw new RuntimeException(exception);
}
Expand All @@ -69,7 +70,7 @@ public boolean hasNext() {
if (iterator().hasNext()) {
return true;
} else {
currentState = checkForIterationCompleteness;
currentState = checkForConsumptionCompleteness;
return currentState.hasNext();
}
}
Expand All @@ -95,16 +96,20 @@ Iterator<SolrDocument> iterator() {
/**
* Iteration state: once a page has been consumed we need to determine if another query should be issued or not.
*/
private final Iterator<Triple> checkForIterationCompleteness = new UnmodifiableIterator<Triple>() {
private final Iterator<Triple> checkForConsumptionCompleteness = new UnmodifiableIterator<Triple>() {
@Override
public boolean hasNext() {
return !(page.size() < query.getRows() || sentCursorMark.equals(nextCursorMark));
final boolean hasNext = (page.size() == query.getRows() && !sentCursorMark.equals(nextCursorMark));
if (hasNext) {
query.set(CursorMarkParams.CURSOR_MARK_PARAM, nextCursorMark);
currentState = executeQuery;
return currentState.hasNext();
}
return false;
}

@Override
public Triple next() {
query.set(CursorMarkParams.CURSOR_MARK_PARAM, nextCursorMark);
currentState = executeQuery;
return currentState.next();
}
};
Expand Down
Loading

0 comments on commit 7ff0cb7

Please sign in to comment.