diff --git a/pom.xml b/pom.xml index 9a34abe..964cf6e 100644 --- a/pom.xml +++ b/pom.xml @@ -28,6 +28,11 @@ 4.12 test + + log4j + log4j + 1.2.17 + diff --git a/src/main/java/org/apache/minibase/BloomFilter.java b/src/main/java/org/apache/minibase/BloomFilter.java index e8d0341..837881a 100644 --- a/src/main/java/org/apache/minibase/BloomFilter.java +++ b/src/main/java/org/apache/minibase/BloomFilter.java @@ -11,17 +11,10 @@ public BloomFilter(int k, int bitsPerKey) { this.bitsPerKey = bitsPerKey; } - public BloomFilter(int k, int bitsPerKey, byte[] result) { - this.k = k; - this.bitsPerKey = bitsPerKey; - this.bitLen = result.length; - this.result = result; - } - public byte[] generate(byte[][] keys) { assert keys != null; bitLen = keys.length * bitsPerKey; - bitLen = ((bitLen + 7) / 8) << 3; + bitLen = ((bitLen + 7) / 8) << 3; // align the bitLen. bitLen = bitLen < 64 ? 64 : bitLen; result = new byte[bitLen >> 3]; for (int i = 0; i < keys.length; i++) { diff --git a/src/main/java/org/apache/minibase/DiskFile.java b/src/main/java/org/apache/minibase/DiskFile.java index 6429d37..5036611 100644 --- a/src/main/java/org/apache/minibase/DiskFile.java +++ b/src/main/java/org/apache/minibase/DiskFile.java @@ -373,6 +373,10 @@ public void open(String filename) throws IOException { + ", blockCount: " + blockCount; } + public String getFileName() { + return fname; + } + private BlockReader load(BlockMeta meta) throws IOException { in.seek(meta.getOffset()); diff --git a/src/main/java/org/apache/minibase/DiskStore.java b/src/main/java/org/apache/minibase/DiskStore.java index 5d13de9..605ba16 100644 --- a/src/main/java/org/apache/minibase/DiskStore.java +++ b/src/main/java/org/apache/minibase/DiskStore.java @@ -1,30 +1,41 @@ package org.apache.minibase; +import org.apache.log4j.Logger; import org.apache.minibase.DiskFile.DiskFileWriter; +import org.apache.minibase.MiniBase.Compactor; import org.apache.minibase.MiniBase.Flusher; import org.apache.minibase.MiniBase.Iter; import java.io.Closeable; import java.io.File; -import java.io.FileFilter; import java.io.IOException; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.PriorityQueue; import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.regex.Matcher; import java.util.regex.Pattern; public class DiskStore implements Closeable { + private static final Logger LOG = Logger.getLogger(DiskStore.class); + private static final String FILE_NAME_TMP_SUFFIX = ".tmp"; + private static final String FILE_NAME_ARCHIVE_SUFFIX = ".archive"; private static final Pattern DATA_FILE_RE = Pattern.compile("data\\.([0-9]+)"); // data.1 + private String dataDir; private List diskFiles; - public DiskStore(String dataDir) { + private int maxDiskFiles; + private volatile AtomicLong maxFileId; + + public DiskStore(String dataDir, int maxDiskFiles) { this.dataDir = dataDir; this.diskFiles = new ArrayList<>(); + this.maxDiskFiles = maxDiskFiles; } private File[] listDiskFiles() { @@ -32,20 +43,27 @@ private File[] listDiskFiles() { return f.listFiles(fname -> DATA_FILE_RE.matcher(fname.getName()).matches()); } - public synchronized long nextDiskFileId() { + public synchronized long getMaxDiskId() { + // TODO can we save the maxFileId ? and next time, need not to traverse the disk file. File[] files = listDiskFiles(); - long maxFileId = 0; + long maxFileId = -1L; for (File f : files) { Matcher matcher = DATA_FILE_RE.matcher(f.getName()); if (matcher.matches()) { maxFileId = Math.max(Long.parseLong(matcher.group(1)), maxFileId); } } - return maxFileId + 1; + return maxFileId; + } + + public synchronized long nextDiskFileId() { + return maxFileId.incrementAndGet(); } - public synchronized void addDiskFile(DiskFile df) { - diskFiles.add(df); + public void addDiskFile(DiskFile df) { + synchronized (diskFiles) { + diskFiles.add(df); + } } public synchronized void addDiskFile(String filename) throws IOException { @@ -65,6 +83,17 @@ public void open() throws IOException { df.open(f.getAbsolutePath()); diskFiles.add(df); } + maxFileId = new AtomicLong(getMaxDiskId()); + } + + public List getDiskFiles() { + synchronized (diskFiles) { + return new ArrayList<>(diskFiles); + } + } + + public long getMaxDiskFiles() { + return this.maxDiskFiles; } @Override @@ -84,7 +113,9 @@ public void close() throws IOException { public Iter iterator() throws IOException { List> iters = new ArrayList<>(); - diskFiles.stream().forEach(df -> iters.add(df.iterator())); + for (DiskFile df : getDiskFiles()) { + iters.add(df.iterator()); + } return new MultiIter(iters); } @@ -98,7 +129,7 @@ public DefaultFlusher(DiskStore diskStore) { @Override public void flush(Set kvSet) throws IOException { String fileName = diskStore.getNextDiskFileName(); - String fileTempName = diskStore.getNextDiskFileName() + ".tmp"; + String fileTempName = fileName + FILE_NAME_TMP_SUFFIX; try { try (DiskFileWriter writer = new DiskFileWriter(fileTempName)) { for (Iterator it = kvSet.iterator(); it.hasNext();) { @@ -122,6 +153,91 @@ public void flush(Set kvSet) throws IOException { } } + public static class DefaultCompactor extends Compactor { + private DiskStore diskStore; + private volatile boolean running = true; + + public DefaultCompactor(DiskStore diskStore) { + this.diskStore = diskStore; + this.setDaemon(true); + } + + public void minorCompact() throws IOException { + // TODO implement the minor compaction. + } + + private void majorCompact() throws IOException { + String fileName = diskStore.getNextDiskFileName(); + String fileTempName = fileName + FILE_NAME_TMP_SUFFIX; + try { + try (DiskFileWriter writer = new DiskFileWriter(fileTempName)) { + for (Iter it = diskStore.iterator(); it.hasNext();) { + writer.append(it.next()); + } + writer.appendIndex(); + writer.appendTrailer(); + } + File f = new File(fileTempName); + if (!f.renameTo(new File(fileName))) { + throw new IOException("Rename " + fileTempName + " to " + fileName + " failed"); + } + + // Rename the data files to archive files. + // TODO when rename the files, will we effect the scan ? + diskStore.close(); + diskStore.getDiskFiles().stream().forEach(df -> { + File file = new File(df.getFileName()); + File archiveFile = new File(df.getFileName() + FILE_NAME_ARCHIVE_SUFFIX); + if (!file.renameTo(archiveFile)) { + LOG.error("Rename " + df.getFileName() + " to " + archiveFile.getName() + " failed."); + } + }); + + // TODO any concurrent issue ? + diskStore.addDiskFile(fileName); + } finally { + File f = new File(fileTempName); + if (f.exists()) { + f.delete(); + } + } + } + + @Override + public void compact(boolean isMajor) throws IOException { + if (isMajor) { + majorCompact(); + } else { + minorCompact(); + } + } + + public void run() { + while (running) { + try { + boolean isCompacted = false; + if (diskStore.getDiskFiles().size() > diskStore.getMaxDiskFiles()) { + majorCompact(); + isCompacted = true; + } + if (!isCompacted) { + Thread.sleep(1000); + } + } catch (IOException e) { + e.printStackTrace(); + LOG.error("Major compaction failed: ", e); + } catch (InterruptedException ie) { + LOG.error("InterruptedException happened, stop running: ", ie); + break; + } + } + } + + public void stopRunning() { + this.running = false; + } + } + public static class MultiIter implements Iter { private class IterNode { diff --git a/src/main/java/org/apache/minibase/MemStore.java b/src/main/java/org/apache/minibase/MemStore.java index 80b321b..f9f9a43 100644 --- a/src/main/java/org/apache/minibase/MemStore.java +++ b/src/main/java/org/apache/minibase/MemStore.java @@ -1,5 +1,6 @@ package org.apache.minibase; +import org.apache.log4j.Logger; import org.apache.minibase.DiskStore.MultiIter; import org.apache.minibase.MiniBase.Flusher; import org.apache.minibase.MiniBase.Iter; @@ -16,6 +17,8 @@ public class MemStore extends Thread implements Closeable { + private static final Logger LOG = Logger.getLogger(MemStore.class); + public static final long MAX_MEMSTORE_SIZE = 256 * 1024 * 1024L; private final AtomicLong memsize; @@ -81,7 +84,7 @@ public void run() { flusher.flush(snapshot); } } catch (IOException e) { - e.printStackTrace(); // TODO use log + LOG.error("MemStore flush failed: ", e); } finally { synchronized (snapshotExists) { if (snapshotExists.compareAndSet(true, false)) { diff --git a/src/main/java/org/apache/minibase/MiniBase.java b/src/main/java/org/apache/minibase/MiniBase.java index 5792f9c..b5b2245 100644 --- a/src/main/java/org/apache/minibase/MiniBase.java +++ b/src/main/java/org/apache/minibase/MiniBase.java @@ -20,6 +20,10 @@ public static interface Flusher { public void flush(Set kvSet) throws IOException; } + public static abstract class Compactor extends Thread { + public abstract void compact(boolean isMajor) throws IOException; + } + public interface Iter { public boolean hasNext() throws IOException; diff --git a/src/main/java/org/apache/minibase/MiniBaseImpl.java b/src/main/java/org/apache/minibase/MiniBaseImpl.java index e40dedb..310d2a4 100644 --- a/src/main/java/org/apache/minibase/MiniBaseImpl.java +++ b/src/main/java/org/apache/minibase/MiniBaseImpl.java @@ -1,5 +1,6 @@ package org.apache.minibase; +import org.apache.minibase.DiskStore.DefaultCompactor; import org.apache.minibase.DiskStore.DefaultFlusher; import org.apache.minibase.DiskStore.MultiIter; @@ -10,11 +11,16 @@ public class MiniBaseImpl implements MiniBase { private static final String DEFAULT_DATA_DIR = "MiniBase"; + private String dataDir = DEFAULT_DATA_DIR; private long maxMemStoreSize = 256 * 1024 * 1024L; + private int MAX_DISK_FILES = 10; + private int maxDiskFiles = MAX_DISK_FILES; + private MemStore memStore; private DiskStore diskStore; + private Compactor compactor; public MiniBaseImpl setDataDir(String datDir) { this.dataDir = datDir; @@ -26,12 +32,20 @@ public MiniBaseImpl setMaxMemStoreSize(long maxMemStoreSize) { return this; } + public MiniBaseImpl setMaxDiskFiles(int maxDiskFiles) { + this.maxDiskFiles = maxDiskFiles; + return this; + } + public MiniBase open() throws IOException { - diskStore = new DiskStore(this.dataDir); + diskStore = new DiskStore(this.dataDir, maxDiskFiles); diskStore.open(); memStore = new MemStore(maxMemStoreSize, new DefaultFlusher(diskStore)); memStore.start(); + + compactor = new DefaultCompactor(diskStore); + compactor.start(); return this; } @@ -69,5 +83,6 @@ public void close() throws IOException { memStore.flush(); memStore.close(); diskStore.close(); + compactor.interrupt(); } } diff --git a/src/test/java/org/apache/minibase/TestMemStore.java b/src/test/java/org/apache/minibase/TestMemStore.java new file mode 100644 index 0000000..66ba657 --- /dev/null +++ b/src/test/java/org/apache/minibase/TestMemStore.java @@ -0,0 +1,67 @@ +package org.apache.minibase; + +import org.apache.minibase.MiniBase.Flusher; +import org.apache.minibase.MiniBase.Iter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +public class TestMemStore { + + static class MockFlusher implements Flusher { + + private final AtomicInteger flushCounter = new AtomicInteger(0); + + @Override + public void flush(Set kvSet) throws IOException { + flushCounter.incrementAndGet(); + while (true) { + // block forever + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + + @Test + public void testInMemorySnapshot() throws Exception { + long flushSizeLimit = 2 * 1024 * 1024L; + List list = new ArrayList<>(); + for (long i = 0, currentSize = 0;; i++) { + KeyValue kv = KeyValue.create(Bytes.toBytes(i), Bytes.toBytes(i)); + list.add(kv); + currentSize += kv.size(); + if (currentSize > flushSizeLimit + flushSizeLimit / 2) { + break; + } + } + + MockFlusher flusher = new MockFlusher(); + MemStore store = new MemStore(flushSizeLimit, flusher); + store.start(); + + Assert.assertEquals(flusher.flushCounter.get(), 0); + + for (KeyValue kv : list) { + store.add(kv); + } + + Assert.assertEquals(flusher.flushCounter.get(), 1); + + Iter iter = store.iterator(); + for (KeyValue kv : list) { + Assert.assertTrue(iter.hasNext()); + Assert.assertEquals(iter.next(), kv); + } + Assert.assertFalse(iter.hasNext()); + store.close(); + } +} diff --git a/src/test/java/org/apache/minibase/TestMiniBase.java b/src/test/java/org/apache/minibase/TestMiniBase.java new file mode 100644 index 0000000..19d0eec --- /dev/null +++ b/src/test/java/org/apache/minibase/TestMiniBase.java @@ -0,0 +1,113 @@ +package org.apache.minibase; + +import org.apache.minibase.MiniBase.Iter; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +public class TestMiniBase { + + private String dataDir; + + @Before + public void setUp() { + dataDir = "target/minihbase-" + System.currentTimeMillis(); + File f = new File(dataDir); + Assert.assertTrue(f.mkdirs()); + } + + @After + public void tearDown() { + } + + private static class WriterThread extends Thread { + + private long start, end; + private MiniBase db; + + public WriterThread(MiniBase db, long start, long end) { + this.db = db; + this.start = start; + this.end = end; + } + + public void run() { + try { + for (long i = start; i < end; i++) { + db.put(Bytes.toBytes(i), Bytes.toBytes(i)); + } + } catch (IOException e) { + e.printStackTrace(); + } + } + } + + @Test + public void testPut() throws IOException, InterruptedException { + final MiniBase db = + MiniBaseImpl.create().setDataDir(dataDir).setMaxMemStoreSize((1 << 20) * 256L).open(); + + final long totalKVSize = 100000000L; // 10^8 + final int threadSize = 100; + + WriterThread[] writers = new WriterThread[threadSize]; + for (int i = 0; i < threadSize; i++) { + long kvPerThread = totalKVSize / threadSize; + writers[i] = new WriterThread(db, i * kvPerThread, (i + 1) * kvPerThread); + writers[i].start(); + } + + for (int i = 0; i < threadSize; i++) { + writers[i].join(); + } + + Iter kv = db.scan(); + long current = 0; + while (kv.hasNext()) { + KeyValue expected = kv.next(); + KeyValue currentKV = KeyValue.create(Bytes.toBytes(current), Bytes.toBytes(current)); + Assert.assertEquals(expected, currentKV); + current++; + } + Assert.assertEquals(current, totalKVSize); + + db.close(); + } + + @Test + public void testMajorCompact() throws IOException, InterruptedException { + final MiniBase db = + MiniBaseImpl.create().setDataDir(dataDir).setMaxMemStoreSize(1 << 12).setMaxDiskFiles(2) + .open(); + + final long totalKVSize = 100000000L; // 10^8 + final int threadSize = 10; + + WriterThread[] writers = new WriterThread[threadSize]; + for (int i = 0; i < threadSize; i++) { + long kvPerThread = totalKVSize / threadSize; + writers[i] = new WriterThread(db, i * kvPerThread, (i + 1) * kvPerThread); + writers[i].start(); + } + + for (int i = 0; i < threadSize; i++) { + writers[i].join(); + } + + Iter kv = db.scan(); + long current = 0; + while (kv.hasNext()) { + KeyValue expected = kv.next(); + KeyValue currentKV = KeyValue.create(Bytes.toBytes(current), Bytes.toBytes(current)); + Assert.assertEquals(expected, currentKV); + current++; + } + Assert.assertEquals(current, totalKVSize); + + db.close(); + } +} diff --git a/src/test/java/org/apache/minibase/TestMultiIter.java b/src/test/java/org/apache/minibase/TestMultiIter.java new file mode 100644 index 0000000..af69ade --- /dev/null +++ b/src/test/java/org/apache/minibase/TestMultiIter.java @@ -0,0 +1,185 @@ +package org.apache.minibase; + +import org.apache.minibase.DiskFile.DiskFileWriter; +import org.apache.minibase.DiskStore.MultiIter; +import org.apache.minibase.MiniBase.Iter; +import org.junit.Assert; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +public class TestMultiIter { + + public static class MockIter implements Iter { + + private int cur; + private KeyValue[] kvs; + + public MockIter(int[] array) throws IOException { + assert array != null; + kvs = new KeyValue[array.length]; + for (int i = 0; i < array.length; i++) { + String s = String.format("%05d", array[i]); + kvs[i] = new KeyValue(Bytes.toBytes(s), Bytes.toBytes(s)); + } + cur = 0; + } + + @Override + public boolean hasNext() throws IOException { + return cur < kvs.length; + } + + @Override + public KeyValue next() throws IOException { + return kvs[cur++]; + } + } + + @Test + public void testMergeSort() throws IOException { + int[] a = new int[] { 2, 5, 8, 10, 20 }; + int[] b = new int[] { 11, 12, 12 }; + MockIter iter1 = new MockIter(a); + MockIter iter2 = new MockIter(b); + Iter[] iters = new Iter[] { iter1, iter2 }; + MultiIter multiIter = new MultiIter(iters); + + String[] results = + new String[] { "00002", "00005", "00008", "00010", "00011", "00012", "00012", "00020" }; + int index = 0; + + while (multiIter.hasNext()) { + KeyValue kv = multiIter.next(); + Assert.assertTrue(index < results.length); + Assert.assertArrayEquals(kv.getKey(), Bytes.toBytes(results[index])); + Assert.assertArrayEquals(kv.getValue(), Bytes.toBytes(results[index])); + index++; + } + + Assert.assertEquals(index, results.length); + } + + @Test + public void testMergeSort2() throws IOException { + int[] a = new int[] {}; + int[] b = new int[] {}; + MockIter iter1 = new MockIter(a); + MockIter iter2 = new MockIter(b); + Iter[] iters = new Iter[] { iter1, iter2 }; + MultiIter multiIter = new MultiIter(iters); + + Assert.assertFalse(multiIter.hasNext()); + } + + @Test + public void testMergeSort3() throws IOException { + int[] a = new int[] {}; + int[] b = new int[] { 1 }; + MockIter iter1 = new MockIter(a); + MockIter iter2 = new MockIter(b); + Iter[] iters = new Iter[] { iter1, iter2 }; + MultiIter multiIter = new MultiIter(iters); + + Assert.assertTrue(multiIter.hasNext()); + Assert.assertEquals(multiIter.next(), + KeyValue.create(Bytes.toBytes("00001"), Bytes.toBytes("00001"))); + Assert.assertFalse(multiIter.hasNext()); + } + + @Test + public void testMergeSort4() throws IOException { + int[] a = new int[] {}; + int[] b = new int[] { 1, 1 }; + int[] c = new int[] { 1, 1 }; + MockIter iter1 = new MockIter(a); + MockIter iter2 = new MockIter(b); + MockIter iter3 = new MockIter(c); + Iter[] iters = new Iter[] { iter1, iter2, iter3 }; + MultiIter multiIter = new MultiIter(iters); + + int count = 0; + while (multiIter.hasNext()) { + Assert.assertEquals(multiIter.next(), + KeyValue.create(Bytes.toBytes("00001"), Bytes.toBytes("00001"))); + count++; + } + Assert.assertEquals(count, 4); + } + + private void testDiskFileMergeSort(String[] inputs, String output, int rowCount) + throws IOException { + try { + DiskFileWriter[] writers = new DiskFileWriter[inputs.length]; + DiskFile[] readers = new DiskFile[inputs.length]; + Iter iterArray[] = new Iter[inputs.length]; + + for (int i = 0; i < inputs.length; i++) { + writers[i] = new DiskFileWriter(inputs[i]); + } + for (int i = 0; i < rowCount; i++) { + int k = i % inputs.length; + writers[k].append(KeyValue.create(Bytes.toBytes(i), Bytes.toBytes(i))); + } + for (int i = 0; i < inputs.length; i++) { + writers[i].appendIndex(); + writers[i].appendTrailer(); + writers[i].close(); + + // open the file + readers[i] = new DiskFile(); + readers[i].open(inputs[i]); + iterArray[i] = readers[i].iterator(); + } + + DiskFileWriter writer = new DiskFileWriter(output); + MultiIter iter = new MultiIter(iterArray); + while (iter.hasNext()) { + writer.append(iter.next()); + } + + writer.appendIndex(); + writer.appendTrailer(); + writer.close(); + + // close the readers + for (int i = 0; i < readers.length; i++) { + readers[i].close(); + } + + DiskFile reader = new DiskFile(); + reader.open(output); + Iter resultIter = reader.iterator(); + int count = 0; + while (resultIter.hasNext()) { + Assert.assertEquals(resultIter.next(), + KeyValue.create(Bytes.toBytes(count), Bytes.toBytes(count))); + count++; + } + Assert.assertEquals(count, rowCount); + reader.close(); + } finally { + // Remove the dbFile. + List deleteFiles = new ArrayList<>(Arrays.asList(inputs)); + deleteFiles.add(output); + for (String fileName : deleteFiles) { + File f = new File(fileName); + if (f.exists()) { + f.delete(); + } + } + } + } + + @Test + public void testDiskFileMergeSort() throws IOException { + testDiskFileMergeSort(new String[] { "a.db", "b.db" }, "c.db", 10); + testDiskFileMergeSort(new String[] { "a.db" }, "b.db", 1); + testDiskFileMergeSort(new String[] { "a.db", "b.db", "c.db" }, "d.db", 10000000); + testDiskFileMergeSort(new String[] { "a.db", "b.db", "c.db" }, "d.db", 100); + } +}