Skip to content

Commit

Permalink
Merge pull request #50 from bellofreedom/key_iterator
Browse files Browse the repository at this point in the history
Add key iterator for HaloDB
  • Loading branch information
bellofreedom committed Apr 29, 2020
2 parents 4e617b4 + 550a042 commit 741b4f7
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 0 deletions.
4 changes: 4 additions & 0 deletions src/main/java/com/oath/halodb/HaloDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public HaloDBIterator newIterator() throws HaloDBException {
return new HaloDBIterator(dbInternal);
}

public HaloDBKeyIterator newKeyIterator() {
return new HaloDBKeyIterator(dbInternal);
}

public void pauseCompaction() throws HaloDBException {
try {
dbInternal.pauseCompaction();
Expand Down
116 changes: 116 additions & 0 deletions src/main/java/com/oath/halodb/HaloDBKeyIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package com.oath.halodb;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HaloDBKeyIterator implements Iterator<RecordKey>{
private static final Logger logger = LoggerFactory.getLogger(HaloDBIterator.class);

private Iterator<Integer> outer;
private Iterator<IndexFileEntry> inner;
private HaloDBFile currentFile;

private RecordKey next;

private final HaloDBInternal dbInternal;

HaloDBKeyIterator(HaloDBInternal dbInternal) {
this.dbInternal = dbInternal;
outer = dbInternal.listDataFileIds().iterator();
}

@Override
public boolean hasNext() {
if (next != null) {
return true;
}

try {
// inner == null means this is the first time hasNext() is called.
// use moveToNextFile() to move to the first file.
if (inner == null && !moveToNextFile()) {
return false;
}

do {
if (readNextRecord()) {
return true;
}
} while (moveToNextFile());

return false;

} catch (IOException e) {
logger.error("Error in Iterator", e);
return false;
}
}

@Override
public RecordKey next() {
if (hasNext()) {
RecordKey key = next;
next = null;
return key;
}
throw new NoSuchElementException();
}

private boolean moveToNextFile() throws IOException {
while (outer.hasNext()) {
int fileId = outer.next();
currentFile = dbInternal.getHaloDBFile(fileId);
if (currentFile != null) {
try {
inner = currentFile.getIndexFile().newIterator();
return true;
} catch (ClosedChannelException e) {
if (dbInternal.isClosing()) {
//TODO: define custom Exception classes for HaloDB.
throw new RuntimeException("DB is closing");
}
logger.debug("Index file {} closed, probably by compaction thread. Skipping to next one", fileId);
}
}
logger.debug("Data file {} deleted, probably by compaction thread. Skipping to next one", fileId);
}

return false;
}

private boolean readNextRecord() {
while (inner.hasNext()) {
IndexFileEntry entry = inner.next();
try {
try {
next = readValidRecordKey(entry);
if (next != null) {
return true;
}
} catch (ClosedChannelException e) {
if (dbInternal.isClosing()) {
throw new RuntimeException("DB is closing");
}
logger.debug("Data file {} closed, probably by compaction thread. Skipping to next one", currentFile.getFileId());
break;
}
} catch (IOException e) {
logger.info("Error in iterator", e);
break;
}
}
return false;
}

private RecordKey readValidRecordKey(IndexFileEntry entry) throws IOException {
InMemoryIndexMetaData meta = Utils.getMetaData(entry, currentFile.getFileId());
RecordKey key = null;
if (dbInternal.isRecordFresh(entry.getKey(), meta)) {
key = new RecordKey(entry.getKey());
}
return key;
}
}
31 changes: 31 additions & 0 deletions src/main/java/com/oath/halodb/RecordKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.oath.halodb;

import java.util.*;

public class RecordKey {
final byte[] key;
public RecordKey(byte[] key) {
this.key = key;
}

public byte[] getBytes() {
return key;
}

@Override
public boolean equals(Object obj) {
// to be used in tests as we don't check if the headers are the same.

if (this == obj) {
return true;
}
if (!(obj instanceof RecordKey)) {
return false;
}

RecordKey recordKey = (RecordKey)obj;
return Arrays.equals(this.key, recordKey.getBytes());
}


}
97 changes: 97 additions & 0 deletions src/test/java/com/oath/halodb/HaloDBKeyIteratorTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package com.oath.halodb;

import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import java.util.*;
import mockit.Invocation;
import mockit.Mock;
import mockit.MockUp;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.testng.Assert;
import org.testng.annotations.Test;

public class HaloDBKeyIteratorTest extends TestBase {

@Test(expectedExceptions = NoSuchElementException.class, dataProvider = "Options")
public void testWithEmptyDB(HaloDBOptions options) throws HaloDBException {
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testWithEmptyDB");

HaloDB db = getTestDB(directory, options);
HaloDBKeyIterator iterator = db.newKeyIterator();
Assert.assertFalse(iterator.hasNext());
iterator.next();
}

@Test(dataProvider = "Options")
public void testWithDelete(HaloDBOptions options) throws HaloDBException {
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testWithEmptyDB");

options.setCompactionDisabled(true);

HaloDB db = getTestDB(directory, options);
int noOfRecords = 10_000;
List<Record> records = TestUtils.insertRandomRecords(db, noOfRecords);

// delete all records.
for (Record r : records) {
db.delete(r.getKey());
}

HaloDBKeyIterator iterator = db.newKeyIterator();
Assert.assertFalse(iterator.hasNext());

// close and open the db again.
db.close();
db = getTestDBWithoutDeletingFiles(directory, options);
iterator = db.newKeyIterator();
Assert.assertFalse(iterator.hasNext());
}

@Test(dataProvider = "Options")
public void testPutAndGetDB(HaloDBOptions options) throws HaloDBException {
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testPutAndGetDB");

options.setCompactionDisabled(true);
options.setMaxFileSize(10 * 1024);

HaloDB db = getTestDB(directory, options);

int noOfRecords = 10_000;
List<Record> records = TestUtils.insertRandomRecords(db, noOfRecords);

List<RecordKey> keys = new LinkedList<>();
for (Record record : records) {
keys.add(new RecordKey(record.getKey()));
}

List<RecordKey> actual = new ArrayList<>();
db.newKeyIterator().forEachRemaining(actual::add);

MatcherAssert.assertThat(actual, Matchers.containsInAnyOrder(keys.toArray()));
}

@Test(dataProvider = "Options")
public void testPutUpdateAndGetDB(HaloDBOptions options) throws HaloDBException {
String directory = TestUtils.getTestDirectory("HaloDBKeyIteratorTest", "testPutUpdateAndGetDB");

options.setCompactionDisabled(true);
options.setMaxFileSize(10 * 1024);

HaloDB db = getTestDB(directory, options);

int noOfRecords = 10_000;
List<Record> records = TestUtils.insertRandomRecords(db, noOfRecords);

List<Record> updated = TestUtils.updateRecords(db, records);

List<RecordKey> keys = new LinkedList<>();
for (Record record : updated) {
keys.add(new RecordKey(record.getKey()));
}

List<RecordKey> actual = new ArrayList<>();
db.newKeyIterator().forEachRemaining(actual::add);
MatcherAssert.assertThat(actual, Matchers.containsInAnyOrder(keys.toArray()));
}
}

0 comments on commit 741b4f7

Please sign in to comment.