Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DRILL-8259: Supports advanced HBase persistence storage options #2596

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -20,5 +20,5 @@
/**
* This is a category used to mark unit tests that test the HBase storage plugin.
*/
public interface HbaseStorageTest {
public interface HBaseStorageTest {
}
Original file line number Diff line number Diff line change
@@ -23,6 +23,7 @@
import org.apache.drill.common.types.Types;

public interface DrillHBaseConstants {

String ROW_KEY = "row_key";

SchemaPath ROW_KEY_PATH = SchemaPath.getSimplePath(ROW_KEY);
@@ -35,7 +36,15 @@ public interface DrillHBaseConstants {

MajorType COLUMN_TYPE = Types.optional(MinorType.VARBINARY);

String SYS_STORE_PROVIDER_HBASE_NAMESPACE = "drill.exec.sys.store.provider.hbase.namespace";

String SYS_STORE_PROVIDER_HBASE_TABLE = "drill.exec.sys.store.provider.hbase.table";

String SYS_STORE_PROVIDER_HBASE_FAMILY = "drill.exec.sys.store.provider.hbase.family";

String SYS_STORE_PROVIDER_HBASE_CONFIG = "drill.exec.sys.store.provider.hbase.config";

String SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG = "drill.exec.sys.store.provider.hbase.table_config";

String SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG = "drill.exec.sys.store.provider.hbase.column_config";
}
Original file line number Diff line number Diff line change
@@ -17,8 +17,7 @@
*/
package org.apache.drill.exec.store.hbase.config;

import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.FAMILY;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER;
import static org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider.QUALIFIER_NAME;

import java.io.IOException;
import java.util.Iterator;
@@ -48,17 +47,19 @@ public class HBasePersistentStore<V> extends BasePersistentStore<V> {
private final String hbaseTableName;

private final String tableName;
private final byte[] familyName;
private final byte[] tableNameStartKey;
private final byte[] tableNameStopKey;

public HBasePersistentStore(PersistentStoreConfig<V> config, Table table) {
public HBasePersistentStore(PersistentStoreConfig<V> config, Table table, byte[] family) {
this.tableName = config.getName() + '\0';
this.tableNameStartKey = Bytes.toBytes(tableName); // "tableName\x00"
this.tableNameStopKey = this.tableNameStartKey.clone();
this.tableNameStopKey[tableNameStartKey.length-1] = 1;
this.config = config;
this.hbaseTable = table;
this.hbaseTableName = table.getName().getNameAsString();
this.familyName = family;
}

@Override
@@ -70,7 +71,7 @@ public PersistentStoreMode getMode() {
public boolean contains(String key) {
try {
Get get = new Get(row(key));
get.addColumn(FAMILY, QUALIFIER);
get.addColumn(familyName, QUALIFIER_NAME);
return hbaseTable.exists(get);
} catch (IOException e) {
throw UserException
@@ -82,13 +83,13 @@ public boolean contains(String key) {

@Override
public V get(String key) {
return get(key, FAMILY);
return get(key, familyName);
}

protected synchronized V get(String key, byte[] family) {
try {
Get get = new Get(row(key));
get.addColumn(family, QUALIFIER);
get.addColumn(family, QUALIFIER_NAME);
Result r = hbaseTable.get(get);
if(r.isEmpty()){
return null;
@@ -103,13 +104,13 @@ protected synchronized V get(String key, byte[] family) {

@Override
public void put(String key, V value) {
put(key, FAMILY, value);
put(key, familyName, value);
}

protected synchronized void put(String key, byte[] family, V value) {
try {
Put put = new Put(row(key));
put.addColumn(family, QUALIFIER, bytes(value));
put.addColumn(family, QUALIFIER_NAME, bytes(value));
hbaseTable.put(put);
} catch (IOException e) {
throw UserException.dataReadError(e)
@@ -122,8 +123,8 @@ protected synchronized void put(String key, byte[] family, V value) {
public synchronized boolean putIfAbsent(String key, V value) {
try {
Put put = new Put(row(key));
put.addColumn(FAMILY, QUALIFIER, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), FAMILY, QUALIFIER, null /*absent*/, put);
put.addColumn(familyName, QUALIFIER_NAME, bytes(value));
return hbaseTable.checkAndPut(put.getRow(), familyName, QUALIFIER_NAME, null /*absent*/, put);
} catch (IOException e) {
throw UserException.dataReadError(e)
.message("Caught error while putting row '%s' into table '%s'", key, hbaseTableName)
@@ -183,7 +184,7 @@ private class Iter implements Iterator<Entry<String, V>> {
Iter(int take) {
try {
Scan scan = new Scan(tableNameStartKey, tableNameStopKey);
scan.addColumn(FAMILY, QUALIFIER);
scan.addColumn(familyName, QUALIFIER_NAME);
scan.setCaching(Math.min(take, 100));
scan.setBatch(take); // set batch size
scanner = hbaseTable.getScanner(scan);
Original file line number Diff line number Diff line change
@@ -20,116 +20,249 @@
import java.io.IOException;
import java.util.Map;

import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.store.hbase.DrillHBaseConstants;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.exec.store.sys.PersistentStoreRegistry;
import org.apache.drill.exec.store.sys.store.provider.BasePersistentStoreProvider;
import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.util.Bytes;

import org.apache.drill.shaded.guava.com.google.common.annotations.VisibleForTesting;

public class HBasePersistentStoreProvider extends BasePersistentStoreProvider {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBasePersistentStoreProvider.class);

static final byte[] FAMILY = Bytes.toBytes("s");
public static final byte[] DEFAULT_FAMILY_NAME = Bytes.toBytes("s");

static final byte[] QUALIFIER = Bytes.toBytes("d");
public static final byte[] QUALIFIER_NAME = Bytes.toBytes("d");

private static final String HBASE_CLIENT_ID = "drill-hbase-persistent-store-client";

private final TableName hbaseTableName;

private final byte[] family;

private Table hbaseTable;

private Configuration hbaseConf;

private Connection connection;
private final Map<String, Object> tableConfig;

private Table hbaseTable;
private final Map<String, Object> columnConfig;

private Connection connection;

@SuppressWarnings("unchecked")
public HBasePersistentStoreProvider(PersistentStoreRegistry registry) {
@SuppressWarnings("unchecked")
final Map<String, Object> config = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
this.hbaseConf = HBaseConfiguration.create();
this.hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, "drill-hbase-persistent-store-client");
if (config != null) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
this.hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue()));
final Map<String, Object> hbaseConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_CONFIG);
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG)) {
tableConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE_CONFIG);
} else {
tableConfig = Maps.newHashMap();
}
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG)) {
columnConfig = (Map<String, Object>) registry.getConfig().getAnyRef(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_COLUMN_CONFIG);
} else {
columnConfig = Maps.newHashMap();
}
hbaseConf = HBaseConfiguration.create();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you know, HBase is a nightmare for operational services due to the complexity of the settings. The actual value in the above example is not a recommended value, no unique value is appropriate for every case, but is simply the type of value that this parameter has to fill, is "true/false", not "0/1".

hi @luocooong im still worried about the defaults, escpecially when drill creates the table on his own...

am i correcth that you dont set any defaults except SYS_STORE_PROVIDER_HBASE_TABLE, SYS_STORE_PROVIDER_HBASE_NAMESPACE and SYS_STORE_PROVIDER_HBASE_FAMILY?

hbaseConf.set(HConstants.HBASE_CLIENT_INSTANCE_ID, HBASE_CLIENT_ID);
if (hbaseConfig != null) {
for (Map.Entry<String, Object> entry : hbaseConfig.entrySet()) {
hbaseConf.set(entry.getKey(), String.valueOf(entry.getValue()));
}
}
this.hbaseTableName = TableName.valueOf(registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE));
logger.info("Received the hbase config is {}", hbaseConfig);
if (!tableConfig.isEmpty()) {
logger.info("Received the table config is {}", tableConfig);
}
if (!columnConfig.isEmpty()) {
logger.info("Received the column config is {}", columnConfig);
}
String tableName = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_TABLE);
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE)) {
String namespaceStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_NAMESPACE);
hbaseTableName = TableName.valueOf(namespaceStr.concat(":").concat(tableName));
} else {
hbaseTableName = TableName.valueOf(tableName);
}
if (registry.getConfig().hasPath(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY)) {
String familyStr = registry.getConfig().getString(DrillHBaseConstants.SYS_STORE_PROVIDER_HBASE_FAMILY);
family = Bytes.toBytes(familyStr);
} else { // The default name
family = DEFAULT_FAMILY_NAME;
}
}

@VisibleForTesting
public HBasePersistentStoreProvider(Configuration conf, String storeTableName) {
this.tableConfig = Maps.newHashMap();
this.columnConfig = Maps.newHashMap();
this.hbaseConf = conf;
this.hbaseTableName = TableName.valueOf(storeTableName);
this.family = DEFAULT_FAMILY_NAME;
}


@VisibleForTesting
public HBasePersistentStoreProvider(Map<String, Object> tableConfig, Map<String, Object> columnConfig, Configuration conf, String storeTableName) {
this.tableConfig = tableConfig;
this.columnConfig = columnConfig;
this.hbaseConf = conf;
this.hbaseTableName = TableName.valueOf(storeTableName);
this.family = DEFAULT_FAMILY_NAME;
}

@Override
public <V> PersistentStore<V> getOrCreateStore(PersistentStoreConfig<V> config) throws StoreException {
switch(config.getMode()){
switch (config.getMode()) {
case BLOB_PERSISTENT:
case PERSISTENT:
return new HBasePersistentStore<>(config, this.hbaseTable);

return new HBasePersistentStore<>(config, hbaseTable, family);
default:
throw new IllegalStateException();
throw new IllegalStateException("Unknown persistent mode");
}
}


@Override
public void start() throws IOException {
// Create the column family builder
ColumnFamilyDescriptorBuilder columnFamilyBuilder = ColumnFamilyDescriptorBuilder
.newBuilder(family)
.setMaxVersions(1);
// Append the config to column family
verifyAndSetColumnConfig(columnConfig, columnFamilyBuilder);
// Create the table builder
TableDescriptorBuilder tableBuilder = TableDescriptorBuilder
.newBuilder(hbaseTableName)
.setColumnFamily(columnFamilyBuilder.build());
// Append the config to table
verifyAndSetTableConfig(tableConfig, tableBuilder);
this.connection = ConnectionFactory.createConnection(hbaseConf);

try(Admin admin = connection.getAdmin()) {
if (!admin.tableExists(hbaseTableName)) {
HTableDescriptor desc = new HTableDescriptor(hbaseTableName);
desc.addFamily(new HColumnDescriptor(FAMILY).setMaxVersions(1));
admin.createTable(desc);
// Go to create the table
admin.createTable(tableBuilder.build());
logger.info("The HBase table of persistent store created : {}", hbaseTableName);
} else {
HTableDescriptor desc = admin.getTableDescriptor(hbaseTableName);
if (!desc.hasFamily(FAMILY)) {
TableDescriptor table = admin.getDescriptor(hbaseTableName);
if (!admin.isTableEnabled(hbaseTableName)) {
admin.enableTable(hbaseTableName); // In case the table is disabled
}
if (!table.hasColumnFamily(family)) {
throw new DrillRuntimeException("The HBase table " + hbaseTableName
+ " specified as persistent store exists but does not contain column family: "
+ (Bytes.toString(FAMILY)));
+ (Bytes.toString(family)));
}
logger.info("The HBase table of persistent store is loaded : {}", hbaseTableName);
}
}

this.hbaseTable = connection.getTable(hbaseTableName);
}

@Override
public synchronized void close() {
if (this.hbaseTable != null) {
try {
this.hbaseTable.close();
this.hbaseTable = null;
} catch (IOException e) {
logger.warn("Caught exception while closing HBase table.", e);
/**
* Verify the configuration of HBase table and
* add them to the table builder.
* @param config Received the table config
* @param builder HBase table builder
*/
private void verifyAndSetTableConfig(Map<String, Object> config, TableDescriptorBuilder builder) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
switch (entry.getKey().toUpperCase()) {
case TableDescriptorBuilder.DURABILITY:
Durability durability = Durability.valueOf(((String) entry.getValue()).toUpperCase());
builder.setDurability(durability);
break;
case TableDescriptorBuilder.COMPACTION_ENABLED:
builder.setCompactionEnabled((Boolean) entry.getValue());
break;
case TableDescriptorBuilder.SPLIT_ENABLED:
builder.setSplitEnabled((Boolean) entry.getValue());
break;
case TableDescriptorBuilder.FLUSH_POLICY:
builder.setFlushPolicyClassName((String) entry.getValue());
break;
case TableDescriptorBuilder.SPLIT_POLICY:
builder.setRegionSplitPolicyClassName((String) entry.getValue());
break;
case TableDescriptorBuilder.MAX_FILESIZE:
builder.setMaxFileSize((Integer) entry.getValue());
break;
case TableDescriptorBuilder.MEMSTORE_FLUSHSIZE:
builder.setMemStoreFlushSize((Integer) entry.getValue());
break;
default:
break;
}
}
if (this.connection != null && !this.connection.isClosed()) {
try {
this.connection.close();
} catch (IOException e) {
logger.warn("Caught exception while closing HBase connection.", e);
}

/**
* Verify the configuration of HBase column family and
* add them to the column family builder.
* @param config Received the column config
* @param builder HBase column family builder
*/
private void verifyAndSetColumnConfig(Map<String, Object> config, ColumnFamilyDescriptorBuilder builder) {
for (Map.Entry<String, Object> entry : config.entrySet()) {
switch (entry.getKey().toUpperCase()) {
case ColumnFamilyDescriptorBuilder.MAX_VERSIONS:
builder.setMaxVersions((Integer) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.TTL:
builder.setTimeToLive((Integer) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.COMPRESSION:
Algorithm algorithm = Algorithm.valueOf(((String) entry.getValue()).toUpperCase());
builder.setCompressionType(algorithm);
break;
case ColumnFamilyDescriptorBuilder.BLOCKCACHE:
builder.setBlockCacheEnabled((Boolean) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.BLOCKSIZE:
builder.setBlocksize((Integer) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING:
DataBlockEncoding encoding = DataBlockEncoding.valueOf(((String) entry.getValue()).toUpperCase());
builder.setDataBlockEncoding(encoding);
break;
case ColumnFamilyDescriptorBuilder.IN_MEMORY:
builder.setInMemory((Boolean) entry.getValue());
break;
case ColumnFamilyDescriptorBuilder.DFS_REPLICATION:
builder.setDFSReplication(((Integer) entry.getValue()).shortValue());
break;
default:
break;
}
this.connection = null;
}
}

@Override
public synchronized void close() {
if (hbaseTable != null) {
AutoCloseables.closeSilently(hbaseTable);
}
if (connection != null && !connection.isClosed()) {
AutoCloseables.closeSilently(connection);
}
logger.info("The HBase connection of persistent store closed.");
}
}
Original file line number Diff line number Diff line change
@@ -17,14 +17,14 @@
*/
package org.apache.drill.hbase;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import static org.apache.drill.test.TestBuilder.mapOf;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class HBaseRecordReaderTest extends BaseHBaseTest {

@Test
Original file line number Diff line number Diff line change
@@ -76,7 +76,7 @@ public class HBaseTestsSuite extends BaseTest {

private static Configuration conf;

private static Connection conn;
protected static Connection conn;
private static Admin admin;

private static HBaseTestingUtility UTIL;
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@
import java.io.IOException;
import java.util.List;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.client.DrillClient;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
@@ -30,7 +30,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseCFAsJSONString extends BaseHBaseTest {

private static DrillClient parent_client;
Original file line number Diff line number Diff line change
@@ -17,12 +17,12 @@
*/
package org.apache.drill.hbase;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseConnectionManager extends BaseHBaseTest {

@Test
Original file line number Diff line number Diff line change
@@ -17,13 +17,13 @@
*/
package org.apache.drill.hbase;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.SlowTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseFilterPushDown extends BaseHBaseTest {

@Test
Original file line number Diff line number Diff line change
@@ -17,12 +17,12 @@
*/
package org.apache.drill.hbase;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseProjectPushDown extends BaseHBaseTest {

@Test
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@
import java.util.List;

import org.apache.drill.PlanTestBase;
import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.rpc.user.QueryDataBatch;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -34,7 +34,7 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseQueries extends BaseHBaseTest {

@Test
Original file line number Diff line number Diff line change
@@ -24,14 +24,14 @@

import java.util.regex.Pattern;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.store.hbase.HBaseRegexParser;
import org.apache.drill.test.DrillTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseRegexParser extends DrillTest {

@Test
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@
import java.util.List;
import java.util.NavigableMap;

import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.store.hbase.HBaseGroupScan;
import org.apache.drill.exec.store.hbase.HBaseScanSpec;
@@ -42,7 +42,7 @@
import org.apache.drill.shaded.guava.com.google.common.collect.Maps;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseRegionScanAssignments extends BaseHBaseTest {
static final String HOST_A = "A";
static final String HOST_B = "B";
Original file line number Diff line number Diff line change
@@ -21,65 +21,141 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.categories.HbaseStorageTest;
import java.util.HashMap;
import java.util.Map;

import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.common.config.LogicalPlanPersistence;
import org.apache.drill.exec.exception.StoreException;
import org.apache.drill.exec.planner.PhysicalPlanReaderTestFactory;
import org.apache.drill.exec.store.hbase.config.HBasePersistentStoreProvider;
import org.apache.drill.exec.store.sys.PersistentStore;
import org.apache.drill.exec.store.sys.PersistentStoreConfig;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestHBaseTableProvider extends BaseHBaseTest {

private static HBasePersistentStoreProvider provider;

private static final String STORE_TABLENAME = "drill_store";

private static final int MAX_FILESIZE = 1073741824;
private static final int MEMSTORE_FLUSHSIZE = 536870912;
private static final int MAX_VERSIONS = 1;
private static final int TTL = 3600;
private static final int BLOCKSIZE = 102400;
private static final int DFS_REPLICATION = 1;

private static final Map<String, Object> tableConfig;
private static final Map<String, Object> columnConfig;

static {
// Table level
tableConfig = new HashMap<>();
tableConfig.put(TableDescriptorBuilder.DURABILITY, Durability.ASYNC_WAL.name());
tableConfig.put(TableDescriptorBuilder.COMPACTION_ENABLED, false);
tableConfig.put(TableDescriptorBuilder.SPLIT_ENABLED, false);
tableConfig.put(TableDescriptorBuilder.MAX_FILESIZE, MAX_FILESIZE); // 1GB
tableConfig.put(TableDescriptorBuilder.MEMSTORE_FLUSHSIZE, MEMSTORE_FLUSHSIZE); // 512 MB
// Column Family level
columnConfig = new HashMap<>();
columnConfig.put(ColumnFamilyDescriptorBuilder.MAX_VERSIONS, MAX_VERSIONS);
columnConfig.put(ColumnFamilyDescriptorBuilder.TTL, TTL); // 1 HOUR
columnConfig.put(ColumnFamilyDescriptorBuilder.COMPRESSION, Compression.Algorithm.NONE.name());
columnConfig.put(ColumnFamilyDescriptorBuilder.BLOCKCACHE, false);
columnConfig.put(ColumnFamilyDescriptorBuilder.BLOCKSIZE, BLOCKSIZE);
columnConfig.put(ColumnFamilyDescriptorBuilder.DATA_BLOCK_ENCODING, DataBlockEncoding.FAST_DIFF.name());
columnConfig.put(ColumnFamilyDescriptorBuilder.IN_MEMORY, true);
columnConfig.put(ColumnFamilyDescriptorBuilder.DFS_REPLICATION, DFS_REPLICATION);
}

@BeforeClass // mask HBase cluster start function
public static void setUpBeforeTestHBaseTableProvider() throws Exception {
provider = new HBasePersistentStoreProvider(storagePluginConfig.getHBaseConf(), "drill_store");
provider = new HBasePersistentStoreProvider(tableConfig, columnConfig, storagePluginConfig.getHBaseConf(), STORE_TABLENAME);
provider.start();
}

@Test
public void testStoreTableAttributes() throws Exception {
TableName tableName = TableName.valueOf(STORE_TABLENAME);
try(Admin tableAdmin = HBaseTestsSuite.conn.getAdmin()) {
assertTrue("The store table not found : " + STORE_TABLENAME, tableAdmin.tableExists(tableName));
// Table verify
TableDescriptor tableDescriptor = tableAdmin.getDescriptor(tableName);
assertTrue("The durability must be " + Durability.ASYNC_WAL, tableDescriptor.getDurability() == Durability.ASYNC_WAL);
assertTrue("The compaction must be disabled", !tableDescriptor.isCompactionEnabled());
assertTrue("The split must be disabled", !tableDescriptor.isSplitEnabled());
assertTrue("The max size of hfile must be " + MAX_FILESIZE, tableDescriptor.getMaxFileSize() == MAX_FILESIZE);
assertTrue("The memstore size must be " + MEMSTORE_FLUSHSIZE, tableDescriptor.getMemStoreFlushSize() == MEMSTORE_FLUSHSIZE);
// Column Family verify
assertTrue("The column family not found", tableDescriptor.hasColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME));
ColumnFamilyDescriptor columnDescriptor = tableDescriptor.getColumnFamily(HBasePersistentStoreProvider.DEFAULT_FAMILY_NAME);
assertTrue("The max number of versions must be " + MAX_VERSIONS, columnDescriptor.getMaxVersions() == MAX_VERSIONS);
assertTrue("The time-to-live must be " + TTL, columnDescriptor.getTimeToLive() == TTL);
// TODO native snappy* library not available
assertTrue("The algorithm of compression must be " + Algorithm.NONE, columnDescriptor.getCompressionType() == Algorithm.NONE);
assertTrue("The block cache must be disabled", columnDescriptor.isBlockCacheEnabled() == false);
assertTrue("The block size must be " + BLOCKSIZE, columnDescriptor.getBlocksize() == BLOCKSIZE);
assertTrue("The encoding of data block must be " + DataBlockEncoding.FAST_DIFF, columnDescriptor.getDataBlockEncoding() == DataBlockEncoding.FAST_DIFF);
assertTrue("The in-memory must be enabled", columnDescriptor.isInMemory());
assertTrue("The replication of dfs must be " + DFS_REPLICATION, columnDescriptor.getDFSReplication() == DFS_REPLICATION);
}
}

@Test
public void testTableProvider() throws StoreException {
LogicalPlanPersistence lp = PhysicalPlanReaderTestFactory.defaultLogicalPlanPersistence(config);
PersistentStore<String> hbaseStore = provider.getOrCreateStore(
PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build());
hbaseStore.put("", "v0");
hbaseStore.put("k1", "v1");
hbaseStore.put("k2", "v2");
hbaseStore.put("k3", "v3");
hbaseStore.put("k4", "v4");
hbaseStore.put("k5", "v5");
hbaseStore.put(".test", "testValue");

assertEquals("v0", hbaseStore.get(""));
assertEquals("testValue", hbaseStore.get(".test"));

assertTrue(hbaseStore.contains(""));
assertFalse(hbaseStore.contains("unknown_key"));

assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size());

PersistentStore<String> hbaseTestStore = provider.getOrCreateStore(
PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build());
hbaseTestStore.put("", "v0");
hbaseTestStore.put("k1", "v1");
hbaseTestStore.put("k2", "v2");
hbaseTestStore.put("k3", "v3");
hbaseTestStore.put("k4", "v4");
hbaseTestStore.put(".test", "testValue");

assertEquals("v0", hbaseStore.get(""));
assertEquals("testValue", hbaseStore.get(".test"));

assertEquals(6, Lists.newArrayList(hbaseTestStore.getAll()).size());
{
PersistentStoreConfig<String> storeConfig = PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase").build();
PersistentStore<String> hbaseStore = provider.getOrCreateStore(storeConfig);
hbaseStore.put("", "v0");
hbaseStore.put("k1", "v1");
hbaseStore.put("k2", "v2");
hbaseStore.put("k3", "v3");
hbaseStore.put("k4", "v4");
hbaseStore.put("k5", "v5");
hbaseStore.put(".test", "testValue");

assertEquals("v0", hbaseStore.get(""));
assertEquals("testValue", hbaseStore.get(".test"));

assertTrue(hbaseStore.contains(""));
assertFalse(hbaseStore.contains("unknown_key"));

assertEquals(7, Lists.newArrayList(hbaseStore.getAll()).size());
}

{
PersistentStoreConfig<String> storeConfig = PersistentStoreConfig.newJacksonBuilder(lp.getMapper(), String.class).name("hbase.test").build();
PersistentStore<String> hbaseStore = provider.getOrCreateStore(storeConfig);
hbaseStore.put("", "v0");
hbaseStore.put("k1", "v1");
hbaseStore.put("k2", "v2");
hbaseStore.put("k3", "v3");
hbaseStore.put("k4", "v4");
hbaseStore.put(".test", "testValue");

assertEquals("v0", hbaseStore.get(""));
assertEquals("testValue", hbaseStore.get(".test"));

assertEquals(6, Lists.newArrayList(hbaseStore.getAll()).size());
}
}

@AfterClass
Original file line number Diff line number Diff line change
@@ -26,7 +26,7 @@
import java.util.List;

import org.apache.drill.test.BaseTestQuery;
import org.apache.drill.categories.HbaseStorageTest;
import org.apache.drill.categories.HBaseStorageTest;
import org.apache.drill.categories.SlowTest;
import org.apache.drill.exec.proto.UserBitShared.QueryType;
import org.apache.drill.exec.record.RecordBatchLoader;
@@ -39,7 +39,7 @@
import org.apache.drill.shaded.guava.com.google.common.io.Resources;
import org.junit.experimental.categories.Category;

@Category({SlowTest.class, HbaseStorageTest.class})
@Category({SlowTest.class, HBaseStorageTest.class})
public class TestOrderedBytesConvertFunctions extends BaseTestQuery {

private static final String CONVERSION_TEST_PHYSICAL_PLAN = "functions/conv/conversionTestWithPhysicalPlan.json";