diff --git a/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java b/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java new file mode 100755 index 000000000000..c5c94155b127 --- /dev/null +++ b/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java @@ -0,0 +1,145 @@ +package alluxio.master.metastore.tikv; + +import alluxio.resource.CloseableIterator; +import com.google.common.primitives.Longs; +import org.tikv.kvproto.Kvrpcpb; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Convenience methods for working with TiKV. + */ +public final class TiKVUtils { + private static final Logger LOG = LoggerFactory.getLogger(TiKVUtils.class); + + private TiKVUtils() {} // Utils class. + + + /** + * @param str a String value + * @param long1 a long value + * @param long2 a long value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str, long long1, long long2) { + byte[] strBytes = str.getBytes(); + + byte[] key = new byte[strBytes.length + 2 * Longs.BYTES]; + System.arraycopy(strBytes, 0, key, 0, strBytes.length); + for (int i = strBytes.length + Longs.BYTES - 1; i >= strBytes.length; i--) { + key[i] = (byte) (long1 & 0xffL); + long1 >>= Byte.SIZE; + } + for (int i = strBytes.length + 2 * Longs.BYTES - 1; i >= strBytes.length + Longs.BYTES; i--) { + key[i] = (byte) (long2 & 0xffL); + long2 >>= Byte.SIZE; + } + return key; + } + + /** + * @param n a long value + * @param str a string value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str, long n) { + byte[] strBytes = str.getBytes(); + + byte[] key = new byte[Longs.BYTES + strBytes.length]; + System.arraycopy(strBytes, 0, key, 0, strBytes.length); + for (int i = key.length - 1; i >= strBytes.length; i--) { + key[i] = (byte) (n & 0xffL); + n >>= Byte.SIZE; + } + return key; + } + + /** + * @param n a long value + * @param str1 a string value + * @param str2 a string value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str1, long n, String str2) { + byte[] strBytes1 = str1.getBytes(); + byte[] strBytes2 = str2.getBytes(); + + byte[] key = new byte[Longs.BYTES + strBytes1.length + strBytes2.length]; + System.arraycopy(strBytes1, 0, key, 0, strBytes1.length); + for (int i = strBytes1.length + Longs.BYTES - 1; i >= strBytes1.length; i--) { + key[i] = (byte) (n & 0xffL); + n >>= Byte.SIZE; + } + System.arraycopy(strBytes2, 0, key, strBytes1.length + Longs.BYTES, strBytes2.length); + return key; + } + + /** + * @param bytes an array of bytes + * @param start the place in the array to read the long from + * @return the long + */ + public static long readLong(byte[] bytes, int start) { + return Longs.fromBytes(bytes[start], bytes[start + 1], bytes[start + 2], bytes[start + 3], + bytes[start + 4], bytes[start + 5], bytes[start + 6], bytes[start + 7]); + } + + + /** + * Used to parse current {@link ListIterator} element. + * + * @param return type of parser's next method + */ + public interface TiKVIteratorParser { + /** + * Parses and return next element. + * + * @param iter {@link ListIterator} instance + * @return parsed value + * @throws Exception if parsing fails + */ + T next(ListIterator iter) throws Exception; + } + + /** + * Used to wrap an {@link CloseableIterator} over {@link ListIterator}. + * It seeks given iterator to first entry before returning the iterator. + * + * @param tikvIterator the tikv iterator + * @param parser parser to produce iterated values from tikv key-value + * @param iterator value type + * @return wrapped iterator + */ + public static CloseableIterator createCloseableIterator( + ListIterator tikvIterator, TiKVIteratorParser parser) { + AtomicBoolean valid = new AtomicBoolean(true); + Iterator iter = new Iterator() { + @Override + public boolean hasNext() { + return valid.get() && tikvIterator.hasNext(); + } + + @Override + public T next() { + try { + return parser.next(tikvIterator); + } catch (Exception exc) { + LOG.warn("Iteration aborted because of error", exc); + valid.set(false); + throw new RuntimeException(exc); + } finally { + if (!tikvIterator.hasNext()) { + valid.set(false); + } + } + } + }; + + return CloseableIterator.noopCloseable(iter); + } + +} diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java new file mode 100644 index 000000000000..1810a5040cd1 --- /dev/null +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java @@ -0,0 +1,157 @@ +package alluxio.master.metastore.tikv; + +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; +import alluxio.master.metastore.BlockMetaStore; +import alluxio.proto.meta.Block.BlockLocation; +import alluxio.proto.meta.Block.BlockMeta; +import alluxio.resource.CloseableIterator; + +import com.google.common.primitives.Longs; +import org.rocksdb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.TiKVException; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.raw.RawKVClient; +import org.tikv.shade.com.google.protobuf.ByteString; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Block store backed by Tikv. + */ +@ThreadSafe +public class TiKVBlockMetaStore implements BlockMetaStore { + private static final Logger LOG = LoggerFactory.getLogger(TiKVBlockMetaStore.class); + private static final String BLOCKS_DB_NAME = "blocks-tikv"; + private static final String BLOCK_META_COLUMN = "blockmeta"; + private static final String BLOCK_LOCATIONS_COLUMN = "blocklocations"; + private static final String ROCKS_STORE_NAME = "BlockStore"; + + private final List mToClose = new ArrayList<>(); + + private final LongAdder mSize = new LongAdder(); + + private TiConfiguration mBlockConf; + private TiSession mBlockSession; + private RawKVClient mBlockClient; + + /** + * Creates and initializes a tikv block store. + * + * @param baseDir the base directory in which to store block store metadata + */ + public TiKVBlockMetaStore(String baseDir) { + String hostConf = Configuration.getString(PropertyKey.MASTER_METASTORE_INODE_TIKV_CONNECTION); + try { + mBlockConf = TiConfiguration.createDefault(hostConf); + mBlockConf.setRawKVReadTimeoutInMS(20000); + mBlockConf.setRawKVWriteTimeoutInMS(20000); + mBlockConf.setKvMode(String.valueOf(TiConfiguration.KVMode.RAW)); + mBlockSession = TiSession.create(mBlockConf); + mBlockClient = mBlockSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional getBlock(long id) { + byte[] meta; + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + try { + Optional bytes = mBlockClient.get(key); + if (!bytes.isPresent()) { + return Optional.empty(); + } + meta = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (meta == null) { + return Optional.empty(); + } + try { + return Optional.of(BlockMeta.parseFrom(meta)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void putBlock(long id, BlockMeta meta) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + ByteString value = ByteString.copyFrom(meta.toByteArray()); + try { + Optional buf = mBlockClient.get(key); + mBlockClient.put(key, value); + if (!buf.isPresent()) { + mSize.increment(); + } + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeBlock(long id) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + Optional buf = mBlockClient.get(key); + mBlockClient.delete(key); + if (!buf.isPresent()) { + mSize.decrement(); + } + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + // TODO + @Override + public void clear() { + mSize.reset(); + LOG.info("clear TiKVBlockStore"); + } + + @Override + public long size() { + return mSize.longValue(); + } + + @Override + public void close() { + mSize.reset(); + LOG.info("Closing TiKVBlockStore and recycling all TiKV JNI objects"); + mBlockClient.close(); + try { + mBlockSession.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TiKVBlockStore closed"); + } + + + @Override + public CloseableIterator getCloseableIterator() { + ListIterator iterator = mBlockClient + .scanPrefix(ByteString.copyFromUtf8(BLOCK_META_COLUMN)).listIterator(); + + return TiKVUtils.createCloseableIterator(iterator, + (iter) -> { + Kvrpcpb.KvPair kv = iter.next(); + byte[] key = kv.getKey().toByteArray(); + return new Block(TiKVUtils.readLong(key, BLOCK_META_COLUMN.length()), + BlockMeta.parseFrom(kv.getValue().toByteArray())); + } + ); + } + +}