Skip to content

Commit

Permalink
Fix the bug
Browse files Browse the repository at this point in the history
  • Loading branch information
openinx committed Oct 20, 2018
1 parent 09f4e6e commit a8f4de4
Show file tree
Hide file tree
Showing 10 changed files with 524 additions and 19 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
</dependencies>


Expand Down
9 changes: 1 addition & 8 deletions src/main/java/org/apache/minibase/BloomFilter.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,10 @@ public BloomFilter(int k, int bitsPerKey) {
this.bitsPerKey = bitsPerKey;
}

public BloomFilter(int k, int bitsPerKey, byte[] result) {
this.k = k;
this.bitsPerKey = bitsPerKey;
this.bitLen = result.length;
this.result = result;
}

public byte[] generate(byte[][] keys) {
assert keys != null;
bitLen = keys.length * bitsPerKey;
bitLen = ((bitLen + 7) / 8) << 3;
bitLen = ((bitLen + 7) / 8) << 3; // align the bitLen.
bitLen = bitLen < 64 ? 64 : bitLen;
result = new byte[bitLen >> 3];
for (int i = 0; i < keys.length; i++) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/apache/minibase/DiskFile.java
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,10 @@ public void open(String filename) throws IOException {
+ ", blockCount: " + blockCount;
}

public String getFileName() {
return fname;
}

private BlockReader load(BlockMeta meta) throws IOException {
in.seek(meta.getOffset());

Expand Down
134 changes: 125 additions & 9 deletions src/main/java/org/apache/minibase/DiskStore.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,69 @@
package org.apache.minibase;

import org.apache.log4j.Logger;
import org.apache.minibase.DiskFile.DiskFileWriter;
import org.apache.minibase.MiniBase.Compactor;
import org.apache.minibase.MiniBase.Flusher;
import org.apache.minibase.MiniBase.Iter;

import java.io.Closeable;
import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

public class DiskStore implements Closeable {

private static final Logger LOG = Logger.getLogger(DiskStore.class);
private static final String FILE_NAME_TMP_SUFFIX = ".tmp";
private static final String FILE_NAME_ARCHIVE_SUFFIX = ".archive";
private static final Pattern DATA_FILE_RE = Pattern.compile("data\\.([0-9]+)"); // data.1

private String dataDir;
private List<DiskFile> diskFiles;

public DiskStore(String dataDir) {
private int maxDiskFiles;
private volatile AtomicLong maxFileId;

public DiskStore(String dataDir, int maxDiskFiles) {
this.dataDir = dataDir;
this.diskFiles = new ArrayList<>();
this.maxDiskFiles = maxDiskFiles;
}

private File[] listDiskFiles() {
File f = new File(this.dataDir);
return f.listFiles(fname -> DATA_FILE_RE.matcher(fname.getName()).matches());
}

public synchronized long nextDiskFileId() {
public synchronized long getMaxDiskId() {
// TODO can we save the maxFileId ? and next time, need not to traverse the disk file.
File[] files = listDiskFiles();
long maxFileId = 0;
long maxFileId = -1L;
for (File f : files) {
Matcher matcher = DATA_FILE_RE.matcher(f.getName());
if (matcher.matches()) {
maxFileId = Math.max(Long.parseLong(matcher.group(1)), maxFileId);
}
}
return maxFileId + 1;
return maxFileId;
}

public synchronized long nextDiskFileId() {
return maxFileId.incrementAndGet();
}

public synchronized void addDiskFile(DiskFile df) {
diskFiles.add(df);
public void addDiskFile(DiskFile df) {
synchronized (diskFiles) {
diskFiles.add(df);
}
}

public synchronized void addDiskFile(String filename) throws IOException {
Expand All @@ -65,6 +83,17 @@ public void open() throws IOException {
df.open(f.getAbsolutePath());
diskFiles.add(df);
}
maxFileId = new AtomicLong(getMaxDiskId());
}

public List<DiskFile> getDiskFiles() {
synchronized (diskFiles) {
return new ArrayList<>(diskFiles);
}
}

public long getMaxDiskFiles() {
return this.maxDiskFiles;
}

@Override
Expand All @@ -84,7 +113,9 @@ public void close() throws IOException {

public Iter<KeyValue> iterator() throws IOException {
List<Iter<KeyValue>> iters = new ArrayList<>();
diskFiles.stream().forEach(df -> iters.add(df.iterator()));
for (DiskFile df : getDiskFiles()) {
iters.add(df.iterator());
}
return new MultiIter(iters);
}

Expand All @@ -98,7 +129,7 @@ public DefaultFlusher(DiskStore diskStore) {
@Override
public void flush(Set<KeyValue> kvSet) throws IOException {
String fileName = diskStore.getNextDiskFileName();
String fileTempName = diskStore.getNextDiskFileName() + ".tmp";
String fileTempName = fileName + FILE_NAME_TMP_SUFFIX;
try {
try (DiskFileWriter writer = new DiskFileWriter(fileTempName)) {
for (Iterator<KeyValue> it = kvSet.iterator(); it.hasNext();) {
Expand All @@ -122,6 +153,91 @@ public void flush(Set<KeyValue> kvSet) throws IOException {
}
}

public static class DefaultCompactor extends Compactor {
private DiskStore diskStore;
private volatile boolean running = true;

public DefaultCompactor(DiskStore diskStore) {
this.diskStore = diskStore;
this.setDaemon(true);
}

public void minorCompact() throws IOException {
// TODO implement the minor compaction.
}

private void majorCompact() throws IOException {
String fileName = diskStore.getNextDiskFileName();
String fileTempName = fileName + FILE_NAME_TMP_SUFFIX;
try {
try (DiskFileWriter writer = new DiskFileWriter(fileTempName)) {
for (Iter<KeyValue> it = diskStore.iterator(); it.hasNext();) {
writer.append(it.next());
}
writer.appendIndex();
writer.appendTrailer();
}
File f = new File(fileTempName);
if (!f.renameTo(new File(fileName))) {
throw new IOException("Rename " + fileTempName + " to " + fileName + " failed");
}

// Rename the data files to archive files.
// TODO when rename the files, will we effect the scan ?
diskStore.close();
diskStore.getDiskFiles().stream().forEach(df -> {
File file = new File(df.getFileName());
File archiveFile = new File(df.getFileName() + FILE_NAME_ARCHIVE_SUFFIX);
if (!file.renameTo(archiveFile)) {
LOG.error("Rename " + df.getFileName() + " to " + archiveFile.getName() + " failed.");
}
});

// TODO any concurrent issue ?
diskStore.addDiskFile(fileName);
} finally {
File f = new File(fileTempName);
if (f.exists()) {
f.delete();
}
}
}

@Override
public void compact(boolean isMajor) throws IOException {
if (isMajor) {
majorCompact();
} else {
minorCompact();
}
}

public void run() {
while (running) {
try {
boolean isCompacted = false;
if (diskStore.getDiskFiles().size() > diskStore.getMaxDiskFiles()) {
majorCompact();
isCompacted = true;
}
if (!isCompacted) {
Thread.sleep(1000);
}
} catch (IOException e) {
e.printStackTrace();
LOG.error("Major compaction failed: ", e);
} catch (InterruptedException ie) {
LOG.error("InterruptedException happened, stop running: ", ie);
break;
}
}
}

public void stopRunning() {
this.running = false;
}
}

public static class MultiIter implements Iter<KeyValue> {

private class IterNode {
Expand Down
5 changes: 4 additions & 1 deletion src/main/java/org/apache/minibase/MemStore.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.minibase;

import org.apache.log4j.Logger;
import org.apache.minibase.DiskStore.MultiIter;
import org.apache.minibase.MiniBase.Flusher;
import org.apache.minibase.MiniBase.Iter;
Expand All @@ -16,6 +17,8 @@

public class MemStore extends Thread implements Closeable {

private static final Logger LOG = Logger.getLogger(MemStore.class);

public static final long MAX_MEMSTORE_SIZE = 256 * 1024 * 1024L;

private final AtomicLong memsize;
Expand Down Expand Up @@ -81,7 +84,7 @@ public void run() {
flusher.flush(snapshot);
}
} catch (IOException e) {
e.printStackTrace(); // TODO use log
LOG.error("MemStore flush failed: ", e);
} finally {
synchronized (snapshotExists) {
if (snapshotExists.compareAndSet(true, false)) {
Expand Down
4 changes: 4 additions & 0 deletions src/main/java/org/apache/minibase/MiniBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ public static interface Flusher {
public void flush(Set<KeyValue> kvSet) throws IOException;
}

public static abstract class Compactor extends Thread {
public abstract void compact(boolean isMajor) throws IOException;
}

public interface Iter<KeyValue> {
public boolean hasNext() throws IOException;

Expand Down
17 changes: 16 additions & 1 deletion src/main/java/org/apache/minibase/MiniBaseImpl.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.apache.minibase;

import org.apache.minibase.DiskStore.DefaultCompactor;
import org.apache.minibase.DiskStore.DefaultFlusher;
import org.apache.minibase.DiskStore.MultiIter;

Expand All @@ -10,11 +11,16 @@
public class MiniBaseImpl implements MiniBase {

private static final String DEFAULT_DATA_DIR = "MiniBase";

private String dataDir = DEFAULT_DATA_DIR;
private long maxMemStoreSize = 256 * 1024 * 1024L;

private int MAX_DISK_FILES = 10;
private int maxDiskFiles = MAX_DISK_FILES;

private MemStore memStore;
private DiskStore diskStore;
private Compactor compactor;

public MiniBaseImpl setDataDir(String datDir) {
this.dataDir = datDir;
Expand All @@ -26,12 +32,20 @@ public MiniBaseImpl setMaxMemStoreSize(long maxMemStoreSize) {
return this;
}

public MiniBaseImpl setMaxDiskFiles(int maxDiskFiles) {
this.maxDiskFiles = maxDiskFiles;
return this;
}

public MiniBase open() throws IOException {
diskStore = new DiskStore(this.dataDir);
diskStore = new DiskStore(this.dataDir, maxDiskFiles);
diskStore.open();

memStore = new MemStore(maxMemStoreSize, new DefaultFlusher(diskStore));
memStore.start();

compactor = new DefaultCompactor(diskStore);
compactor.start();
return this;
}

Expand Down Expand Up @@ -69,5 +83,6 @@ public void close() throws IOException {
memStore.flush();
memStore.close();
diskStore.close();
compactor.interrupt();
}
}
Loading

0 comments on commit a8f4de4

Please sign in to comment.