Skip to content

Commit

Permalink
Introduce a SeekIter
Browse files Browse the repository at this point in the history
  • Loading branch information
openinx committed Jan 27, 2019
1 parent 5b644e1 commit 6874582
Show file tree
Hide file tree
Showing 10 changed files with 347 additions and 44 deletions.
18 changes: 18 additions & 0 deletions src/main/java/org/apache/minibase/Bytes.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
public class Bytes {

public final static byte[] EMPTY_BYTES = new byte[0];
public final static String HEX_TMP = "0123456789ABCDEF";

public static byte[] toBytes(byte b) {
return new byte[] { b };
Expand Down Expand Up @@ -33,6 +34,23 @@ public static byte[] toBytes(long x) {
return b;
}

public static String toHex(byte[] buf){
return toHex(buf, 0, buf.length);
}

public static String toHex(byte[] buf, int offset, int len) {
StringBuilder sb = new StringBuilder();
for (int i = offset; i < offset + len; i++) {
int x = buf[i];
if (x > 32 && x < 127) {
sb.append((char) x);
} else {
sb.append("\\x").append(HEX_TMP.charAt((x >> 4) & 0x0F)).append(HEX_TMP.charAt(x & 0x0F));
}
}
return sb.toString();
}

public static byte[] toBytes(byte[] a, byte[] b) {
if (a == null) return b;
if (b == null) return a;
Expand Down
42 changes: 39 additions & 3 deletions src/main/java/org/apache/minibase/DiskFile.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package org.apache.minibase;

import org.apache.minibase.MiniBase.Iter;
import org.apache.minibase.MStore.SeekIter;

import java.io.Closeable;
import java.io.File;
Expand Down Expand Up @@ -47,6 +47,17 @@ public static class BlockMeta implements Comparable<BlockMeta> {
private long blockSize;
private byte[] bloomFilter;

/**
* Only used for {@link SeekIter} to seek a target block meta. we only care about the lastKV, so
* the other fields can be anything.
*
* @param lastKV the last key value to construct the dummy block meta.
* @return the dummy block meta.
*/
private static BlockMeta createSeekDummy(KeyValue lastKV) {
return new BlockMeta(lastKV, 0L, 0L, Bytes.EMPTY_BYTES);
}

public BlockMeta(KeyValue lastKV, long offset, long size, byte[] bloomFilter) {
this.lastKV = lastKV;
this.blockOffset = offset;
Expand Down Expand Up @@ -453,7 +464,7 @@ private BlockReader load(BlockMeta meta) throws IOException {
return BlockReader.parseFrom(buffer, 0, buffer.length);
}

private class InternalIterator implements Iter<KeyValue> {
private class InternalIterator implements SeekIter<KeyValue> {

private int currentKVIndex = 0;
private BlockReader currentReader;
Expand Down Expand Up @@ -491,9 +502,34 @@ public boolean hasNext() throws IOException {
public KeyValue next() throws IOException {
return currentReader.getKeyValues().get(currentKVIndex++);
}

@Override
public void seekTo(KeyValue target) throws IOException {
// Locate the smallest block meta which has the lastKV >= target.
blockMetaIter = blockMetaSet.tailSet(BlockMeta.createSeekDummy(target)).iterator();
currentReader = null;
if (blockMetaIter.hasNext()) {
currentReader = load(blockMetaIter.next());
currentKVIndex = 0;
// Locate the smallest KV which is greater than or equals to the given KV. We're sure that
// we can find the currentKVIndex, because lastKV of the block is greater than or equals
// to the target KV.
while (currentKVIndex < currentReader.getKeyValues().size()) {
KeyValue curKV = currentReader.getKeyValues().get(currentKVIndex);
if (curKV.compareTo(target) >= 0) {
break;
}
currentKVIndex++;
}
if (currentKVIndex >= currentReader.getKeyValues().size()) {
throw new IOException("Data block mis-encoded, lastKV of the currentReader >= kv, but " +
"we found all kv < target");
}
}
}
}

public Iter<KeyValue> iterator() {
public SeekIter<KeyValue> iterator() {
return new InternalIterator();
}

Expand Down
42 changes: 26 additions & 16 deletions src/main/java/org/apache/minibase/DiskStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.log4j.Logger;
import org.apache.minibase.DiskFile.DiskFileWriter;
import org.apache.minibase.MStore.SeekIter;
import org.apache.minibase.MiniBase.Compactor;
import org.apache.minibase.MiniBase.Flusher;
import org.apache.minibase.MiniBase.Iter;
Expand Down Expand Up @@ -115,13 +116,13 @@ public void close() throws IOException {
}
}

public Iter<KeyValue> createIterator(List<DiskFile> diskFiles) throws IOException {
List<Iter<KeyValue>> iters = new ArrayList<>();
public SeekIter<KeyValue> createIterator(List<DiskFile> diskFiles) throws IOException {
List<SeekIter<KeyValue>> iters = new ArrayList<>();
diskFiles.forEach(df -> iters.add(df.iterator()));
return new MultiIter(iters);
}

public Iter<KeyValue> createIterator() throws IOException {
public SeekIter<KeyValue> createIterator() throws IOException {
return createIterator(getDiskFiles());
}

Expand Down Expand Up @@ -240,38 +241,35 @@ public void stopRunning() {
}
}

public static class MultiIter implements Iter<KeyValue> {
public static class MultiIter implements SeekIter<KeyValue> {

private class IterNode {
KeyValue kv;
Iter<KeyValue> iter;
SeekIter<KeyValue> iter;

public IterNode(KeyValue kv, Iter<KeyValue> it) {
public IterNode(KeyValue kv, SeekIter<KeyValue> it) {
this.kv = kv;
this.iter = it;
}
}

private SeekIter<KeyValue> iters[];
private PriorityQueue<IterNode> queue;

public MultiIter(Iter<KeyValue> iters[]) throws IOException {
public MultiIter(SeekIter<KeyValue> iters[]) throws IOException {
assert iters != null;
queue = new PriorityQueue<>(((o1, o2) -> o1.kv.compareTo(o2.kv)));
this.iters = iters; // Used for seekTo
this.queue = new PriorityQueue<>(((o1, o2) -> o1.kv.compareTo(o2.kv)));
for (int i = 0; i < iters.length; i++) {
if (iters[i] != null && iters[i].hasNext()) {
queue.add(new IterNode(iters[i].next(), iters[i]));
}
}
}

public MultiIter(List<Iter<KeyValue>> iters) throws IOException {
assert iters != null;
queue = new PriorityQueue<>(((o1, o2) -> o1.kv.compareTo(o2.kv)));
for (Iter<KeyValue> iter : iters) {
if (iter != null && iter.hasNext()) {
queue.add(new IterNode(iter.next(), iter));
}
}
@SuppressWarnings("unchecked")
public MultiIter(List<SeekIter<KeyValue>> iters) throws IOException {
this(iters.toArray(new SeekIter[0]));
}

@Override
Expand All @@ -292,5 +290,17 @@ public KeyValue next() throws IOException {
}
return null;
}

@Override
public void seekTo(KeyValue kv) throws IOException {
queue.clear();
for (SeekIter<KeyValue> it : iters) {
it.seekTo(kv);
if (it.hasNext()) {
// Only the iterator which has some elements should be enqueued.
queue.add(new IterNode(it.next(), it));
}
}
}
}
}
9 changes: 9 additions & 0 deletions src/main/java/org/apache/minibase/KeyValue.java
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,15 @@ public int getSerializeSize() {
return RAW_KEY_LEN_SIZE + VAL_LEN_SIZE + getRawKeyLen() + value.length;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("key=").append(Bytes.toHex(this.key)).append("/op=").append(op).append
("/sequenceId=").append(this.sequenceId).append("/value=").append(Bytes.toHex(this
.value));
return sb.toString();
}

public static KeyValue parseFrom(byte[] bytes, int offset) throws IOException {
if (bytes == null) {
throw new IOException("buffer is null");
Expand Down
112 changes: 107 additions & 5 deletions src/main/java/org/apache/minibase/MStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import org.apache.minibase.DiskStore.DefaultCompactor;
import org.apache.minibase.DiskStore.DefaultFlusher;
import org.apache.minibase.DiskStore.MultiIter;
import org.apache.minibase.KeyValue.Op;

import java.io.IOException;
import java.util.ArrayList;
Expand Down Expand Up @@ -59,9 +60,16 @@ public void put(byte[] key, byte[] value) throws IOException {
}

@Override
public byte[] get(byte[] key) throws IOException {
// TODO
return new byte[0];
public KeyValue get(byte[] key) throws IOException {
KeyValue result = null;
Iter<KeyValue> it = scan(key, Bytes.EMPTY_BYTES);
if (it.hasNext()) {
KeyValue kv = it.next();
if (Bytes.compare(kv.getKey(), key) == 0) {
result = kv;
}
}
return result;
}

@Override
Expand All @@ -71,10 +79,94 @@ public void delete(byte[] key) throws IOException {

@Override
public Iter<KeyValue> scan(byte[] start, byte[] stop) throws IOException {
List<Iter<KeyValue>> iterList = new ArrayList<>();
List<SeekIter<KeyValue>> iterList = new ArrayList<>();
iterList.add(memStore.createIterator());
iterList.add(diskStore.createIterator());
return new MultiIter(iterList);
MultiIter it = new MultiIter(iterList);

// with start being EMPTY_BYTES means minus infinity, will skip to seek.
if (Bytes.compare(start, Bytes.EMPTY_BYTES) != 0) {
it.seekTo(KeyValue.createDelete(start, sequenceId.get()));
}

KeyValue stopKV = null;
if (Bytes.compare(stop, Bytes.EMPTY_BYTES) != 0) {
// the smallest kv in all KeyValue with the same key.
stopKV = KeyValue.createDelete(stop, Long.MAX_VALUE);
}
return new ScanIter(stopKV, it);
}

static class ScanIter implements Iter<KeyValue> {

private KeyValue stopKV;
private Iter<KeyValue> storeIt;
// Last KV is the last key value which has the largest sequence id in key values with the
// same key, but diff sequence id or op.
private KeyValue lastKV = null;
private KeyValue pendingKV = null;

public ScanIter(KeyValue stopKV, SeekIter<KeyValue> it) {
this.stopKV = stopKV;
this.storeIt = it;
}

@Override
public boolean hasNext() throws IOException {
if (pendingKV == null) {
switchToNewKey();
}
return pendingKV != null;
}

private boolean shouldStop(KeyValue kv) {
return stopKV != null && Bytes.compare(stopKV.getKey(), kv.getKey()) <= 0;
}

private void switchToNewKey() throws IOException {
if (lastKV != null && shouldStop(lastKV)) {
return;
}
KeyValue curKV;
while (storeIt.hasNext()) {
curKV = storeIt.next();
if (shouldStop(curKV)) {
return;
}
if (curKV.getOp() == Op.Put) {
if (lastKV == null) {
lastKV = pendingKV = curKV;
return;
}
int ret = Bytes.compare(lastKV.getKey(), curKV.getKey());
if (ret < 0) {
lastKV = pendingKV = curKV;
return;
} else if (ret > 0) {
String msg = "KV mis-encoded, curKV < lastKV, curKV:" + Bytes.toHex(curKV.getKey()) +
", lastKV:" + Bytes.toHex(lastKV.getKey());
throw new IOException(msg);
}
// Same key with lastKV, should continue to fetch the next key value.
} else if (curKV.getOp() == Op.Delete) {
if (lastKV == null || Bytes.compare(lastKV.getKey(), curKV.getKey()) != 0) {
lastKV = curKV;
}
} else {
throw new IOException("Unknown op code: " + curKV.getOp());
}
}
}

@Override
public KeyValue next() throws IOException {
if (pendingKV == null) {
switchToNewKey();
}
lastKV = pendingKV;
pendingKV = null;
return lastKV;
}
}

@Override
Expand All @@ -83,4 +175,14 @@ public void close() throws IOException {
diskStore.close();
compactor.interrupt();
}

interface SeekIter<KeyValue> extends Iter<KeyValue> {

/**
* Seek to the smallest key value which is greater than or equals to the given key value.
*
* @param kv
*/
void seekTo(KeyValue kv) throws IOException;
}
}
Loading

0 comments on commit 6874582

Please sign in to comment.