Skip to content

Commit

Permalink
ExplicitTags filtering with FuzzyFilters
Browse files Browse the repository at this point in the history
  • Loading branch information
Ronan Harmegnies authored and manolama committed May 12, 2020
1 parent 793cfed commit 81d2112
Show file tree
Hide file tree
Showing 3 changed files with 258 additions and 51 deletions.
300 changes: 253 additions & 47 deletions src/query/QueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
Expand All @@ -26,13 +27,12 @@
import net.opentsdb.uid.UniqueId;

import org.hbase.async.Bytes;
import org.hbase.async.FilterList;
import org.hbase.async.FuzzyRowFilter;
import org.hbase.async.FuzzyRowFilter.FuzzyFilterPair;
import org.hbase.async.KeyRegexpFilter;
import org.hbase.async.Bytes.ByteMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.hbase.async.ScanFilter;
import org.hbase.async.Scanner;

/**
Expand Down Expand Up @@ -175,9 +175,218 @@ public static String getRowKeyUIDRegex(
return buf.toString();
}

/**
* Crafts a regular expression for scanning over data table rows and filtering
* time series that the user doesn't want.
* @param row_key_literals An optional list of key value pairs to filter on.
* May be null.
* @param explicit_tags Whether or not explicit tags are enabled so that the
* regex only picks out series with the specified tags
* @return A regular expression string to pass to the storage layer.
*/
private static String getRowKeyUIDRegex(
final ByteMap<byte[][]> row_key_literals,
final boolean explicit_tags) {
final int prefix_width = Const.SALT_WIDTH() + TSDB.metrics_width() +
Const.TIMESTAMP_BYTES;
final short name_width = TSDB.tagk_width();
final short value_width = TSDB.tagv_width();
final short tagsize = (short) (name_width + value_width);
// Generate a regexp for our tags. Say we have 2 tags: { 0 0 1 0 0 2 }
// and { 4 5 6 9 8 7 }, the regexp will be:
// "^.{7}(?:.{6})*\\Q\000\000\001\000\000\002\\E(?:.{6})*\\Q\004\005\006\011\010\007\\E(?:.{6})*$"
final StringBuilder buf = new StringBuilder(
15 // "^.{N}" + "(?:.{M})*" + "$"
+ ((13 + tagsize) // "(?:.{M})*\\Q" + tagsize bytes + "\\E"
* ((row_key_literals == null ? 0 : row_key_literals.size()))));

// Alright, let's build this regexp. From the beginning...
buf.append("(?s)" // Ensure we use the DOTALL flag.
+ "^.{")
// ... start by skipping the salt, metric ID and timestamp.
.append(prefix_width)
.append("}");

final Iterator<Entry<byte[], byte[][]>> it = row_key_literals == null ?
new ByteMap<byte[][]>().iterator() : row_key_literals.iterator();

while(it.hasNext()) {
Entry<byte[], byte[][]> entry = it.hasNext() ? it.next() : null;
// TODO - This look ahead may be expensive. We need to get some data around
// whether it's faster for HBase to scan with a look ahead or simply pass
// the rows back to the TSD for filtering.
final boolean not_key =
entry.getValue() != null && entry.getValue().length == 0;

// Skip any number of tags.
if (!explicit_tags) {
buf.append("(?:.{").append(tagsize).append("})*");
}

if (not_key) {
// start the lookahead as we have a key we explicitly do not want in the
// results
buf.append("(?!");
}
buf.append("\\Q");

addId(buf, entry.getKey(), true);
if (entry.getValue() != null && entry.getValue().length > 0) { // Add a group_by.
// We want specific IDs. List them: /(AAA|BBB|CCC|..)/
buf.append("(?:");
for (final byte[] value_id : entry.getValue()) {
if (value_id == null) {
continue;
}
buf.append("\\Q");
addId(buf, value_id, true);
buf.append('|');
}
// Replace the pipe of the last iteration.
buf.setCharAt(buf.length() - 1, ')');
} else {
buf.append(".{").append(value_width).append('}'); // Any value ID.
}

if (not_key) {
// be sure to close off the look ahead
buf.append(")");
}
}

// Skip any number of tags before the end.
if (!explicit_tags) {
buf.append("(?:.{").append(tagsize).append("})*");
}
buf.append("$");
return buf.toString();
}

/**
* Crafts a list of FuzzyFilters for scanning over data table rows and
* filtering time series that the user doesn't want.
* Note: The caller has to restrict the scan to proper start and stop
* for the filter to work correctly.
* @param row_key_literals A list of key value pairs to filter on.
* @return A sorted, non-empty list of FuzzyFilterPair
*/
private static List<FuzzyFilterPair> buildFuzzyFilters(
final ByteMap<byte[][]> row_key_literals) {
final int prefix_width = Const.SALT_WIDTH() + TSDB.metrics_width() +
Const.TIMESTAMP_BYTES;
final short name_width = TSDB.tagk_width();
final short value_width = TSDB.tagv_width();
final short tag_width = (short) (name_width + value_width);
int row_key_size = prefix_width;
if (row_key_literals != null) {
for(byte[][] v: row_key_literals.values()) {
final boolean not_key = v!=null && v.length==0;
if (!not_key) {
row_key_size += tag_width;
}
}
}
final List<FuzzyFilterPair> fuzzy_filter_pairs =
new ArrayList<FuzzyFilterPair>();

// Initialize first_fuzzy_key and first_fuzzy_mask
// these will serve as model for the fuzzy filter list
// generated for tags with multiple values (|)
byte[] first_fuzzy_key = new byte[row_key_size];
byte[] first_fuzzy_mask = new byte[row_key_size];
int fuzzy_offset = 0;
// skip salt & timestamp (filtering should be done by start/stop
// of the scanner)
while(fuzzy_offset < prefix_width) {
first_fuzzy_key[fuzzy_offset] = 0;
first_fuzzy_mask[fuzzy_offset++] =
(row_key_literals != null) ? (byte)1 : (byte)0;
}
if (row_key_literals != null) {
final Iterator<Entry<byte[], byte[][]>> it = row_key_literals.iterator();
while(it.hasNext()) {
Entry<byte[], byte[][]> entry = it.next();
final boolean not_key =
entry.getValue() != null && entry.getValue().length == 0;

if (!not_key) {
final byte[] tag_key = entry.getKey();
System.arraycopy(tag_key, 0,
first_fuzzy_key, fuzzy_offset, name_width);
for (int i=0; i<name_width; i++) {
first_fuzzy_mask[fuzzy_offset++] = 0;
}

final byte[] tag_value;
if (entry.getValue()!=null && entry.getValue().length > 0) {
tag_value = entry.getValue()[0];
} else {
tag_value = null;
}
if (tag_value!=null) {
System.arraycopy(tag_value, 0,
first_fuzzy_key, fuzzy_offset, value_width);
for (int i=0; i<value_width; i++) {
first_fuzzy_mask[fuzzy_offset++] = 0;
}
} else {
// not filtered with fuzzy filter -> skip
for (int i=0; i<value_width; i++) {
first_fuzzy_key[fuzzy_offset] = 0;
first_fuzzy_mask[fuzzy_offset++] = 1;
}
}
}
}
}
fuzzy_filter_pairs.add(new FuzzyFilterPair(first_fuzzy_key, first_fuzzy_mask));

if (row_key_literals != null) {
// generate filters for all combinations of tag values
fuzzy_offset = prefix_width;
final Iterator<Entry<byte[], byte[][]>> it = row_key_literals.iterator();
while(it.hasNext()) {
final Entry<byte[], byte[][]> entry = it.next();
fuzzy_offset += name_width;

// if multiple values value, generate a new combination of filters
// for each value
if (entry.getValue()!=null && entry.getValue().length > 1) {
final List<FuzzyFilterPair> duplicate_fuzzy_filters =
new ArrayList<FuzzyFilterPair>(fuzzy_filter_pairs);
for (int i=1; i<entry.getValue().length; i++) {
final byte[] tag_value = entry.getValue()[i];

for (FuzzyFilterPair pair: duplicate_fuzzy_filters) {
byte[] fuzzy_key =
Arrays.copyOf(pair.getRowKey(), row_key_size);
System.arraycopy(tag_value, 0,
fuzzy_key, fuzzy_offset, value_width);

fuzzy_filter_pairs.add(
new FuzzyFilterPair(fuzzy_key, first_fuzzy_mask));
}
}
}
fuzzy_offset += value_width;
}
}

// Sort filters list over rowkey
Collections.sort(fuzzy_filter_pairs, new Comparator<FuzzyFilterPair>() {
@Override
public int compare(FuzzyFilterPair pair1, FuzzyFilterPair pair2) {
return Bytes.memcmp(pair1.getRowKey(), pair2.getRowKey());
}
});

return fuzzy_filter_pairs;
}

/**
* Sets a filter or filter list on the scanner based on whether or not the
* query had tags it needed to match.
* NOTE: This method will sort the group bys.
* @param scanner The scanner to modify.
* @param group_bys An optional list of tag keys that we want to group on. May
* be null.
Expand Down Expand Up @@ -205,56 +414,53 @@ public static void setDataTableScanFilter(
return;
}

final int prefix_width = Const.SALT_WIDTH() + TSDB.metrics_width() +
Const.TIMESTAMP_BYTES;
final short name_width = TSDB.tagk_width();
final short value_width = TSDB.tagv_width();
final byte[] fuzzy_key;
final byte[] fuzzy_mask;
if (explicit_tags && enable_fuzzy_filter) {
fuzzy_key = new byte[prefix_width + (row_key_literals.size() *
(name_width + value_width))];
fuzzy_mask = new byte[prefix_width + (row_key_literals.size() *
(name_width + value_width))];
System.arraycopy(scanner.getCurrentKey(), 0, fuzzy_key, 0,
scanner.getCurrentKey().length);
} else {
fuzzy_key = fuzzy_mask = null;
}

final String regex = getRowKeyUIDRegex(group_bys, row_key_literals,
explicit_tags, fuzzy_key, fuzzy_mask);
final KeyRegexpFilter regex_filter = new KeyRegexpFilter(
regex.toString(), Const.ASCII_CHARSET);
if (LOG.isDebugEnabled()) {
LOG.debug("Regex for scanner: " + scanner + ": " +
byteRegexToString(regex));
if (group_bys != null) {
Collections.sort(group_bys, Bytes.MEMCMP);
}

if (!(explicit_tags && enable_fuzzy_filter)) {
scanner.setFilter(regex_filter);
return;
}
final int prefix_width = Const.SALT_WIDTH() + TSDB.metrics_width() +
Const.TIMESTAMP_BYTES;

scanner.setStartKey(fuzzy_key);
final byte[] stop_key = Arrays.copyOf(fuzzy_key, fuzzy_key.length);
Internal.setBaseTime(stop_key, end_time);
int idx = Const.SALT_WIDTH() + TSDB.metrics_width() +
Const.TIMESTAMP_BYTES + TSDB.tagk_width();
// max out the tag values
while (idx < stop_key.length) {
for (int i = 0; i < TSDB.tagv_width(); i++) {
stop_key[idx++] = (byte) 0xFF;
if (explicit_tags && enable_fuzzy_filter) {
final List<FuzzyFilterPair> fuzzy_filter_pairs =
buildFuzzyFilters(row_key_literals);

// The Fuzzy Filter list is sorted: the first and last filters row key
// can be used to build a start and stop keys for the scanner
final byte[] start_key = Arrays.copyOf(
fuzzy_filter_pairs.get(0).getRowKey(),
fuzzy_filter_pairs.get(0).getRowKey().length);
System.arraycopy(scanner.getCurrentKey(), 0, start_key, 0, prefix_width);

final byte[] stop_key = Arrays.copyOf(
fuzzy_filter_pairs.get(fuzzy_filter_pairs.size()-1).getRowKey(),
start_key.length);
System.arraycopy(scanner.getCurrentKey(), 0,
stop_key, 0, prefix_width);
Internal.setBaseTime(stop_key, end_time);
int idx = prefix_width + TSDB.tagk_width();
// max out the tag values
while (idx < stop_key.length) {
for (int i = 0; i < TSDB.tagv_width(); i++) {
stop_key[idx++] = (byte) 0xFF;
}
idx += TSDB.tagk_width();
}
idx += TSDB.tagk_width();

scanner.setStartKey(start_key);
scanner.setStopKey(stop_key);
scanner.setFilter(new FuzzyRowFilter(fuzzy_filter_pairs));
} else {
final String regex = getRowKeyUIDRegex(row_key_literals, explicit_tags);
final KeyRegexpFilter regex_filter = new KeyRegexpFilter(
regex.toString(), Const.ASCII_CHARSET);
if (LOG.isDebugEnabled()) {
LOG.debug("Regex for scanner: " + scanner + ": " +
byteRegexToString(regex));
}

scanner.setFilter(regex_filter);
}
scanner.setStopKey(stop_key);
final List<ScanFilter> filters = new ArrayList<ScanFilter>(2);
filters.add(
new FuzzyRowFilter(
new FuzzyRowFilter.FuzzyFilterPair(fuzzy_key, fuzzy_mask)));
filters.add(regex_filter);
scanner.setFilter(new FilterList(filters));
}

/**
Expand Down
7 changes: 4 additions & 3 deletions test/core/TestTsdbQueryQueries.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import net.opentsdb.rollup.RollupInterval;
import org.hbase.async.Bytes;
import org.hbase.async.FilterList;
import org.hbase.async.FuzzyRowFilter;
import org.hbase.async.Scanner;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -1632,7 +1633,7 @@ public void filterExplicitTagsOK() throws Exception {
assertEquals(300, dps[0].aggregatedSize());
// assert fuzzy
for (final MockScanner scanner : storage.getScanners()) {
assertTrue(scanner.getFilter() instanceof FilterList);
assertTrue(scanner.getFilter() instanceof FuzzyRowFilter);
}
}

Expand Down Expand Up @@ -1663,7 +1664,7 @@ public void filterExplicitTagsGroupByOK() throws Exception {
assertEquals(300, dps[0].aggregatedSize());
// assert fuzzy
for (final MockScanner scanner : storage.getScanners()) {
assertTrue(scanner.getFilter() instanceof FilterList);
assertTrue(scanner.getFilter() instanceof FuzzyRowFilter);
}
}

Expand All @@ -1689,7 +1690,7 @@ public void filterExplicitTagsMissing() throws Exception {
assertEquals(0, dps.length);
// assert fuzzy
for (final MockScanner scanner : storage.getScanners()) {
assertTrue(scanner.getFilter() instanceof FilterList);
assertTrue(scanner.getFilter() instanceof FuzzyRowFilter);
}
}

Expand Down
2 changes: 1 addition & 1 deletion test/query/TestQueryUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ public void setDataTableScanFilterEnableExplicit() throws Exception {

@Test
public void setDataTableScanFilterEnableBoth() throws Exception {
when(scanner.getCurrentKey()).thenReturn(new byte[] { 0, 0, 0, 1 });
when(scanner.getCurrentKey()).thenReturn(new byte[] { 0, 0, 0, 0, 0, 0, 1 });
final ByteMap<byte[][]> tags = new ByteMap<byte[][]>();
tags.put(new byte[] { 0, 0, 1 }, new byte[][] { new byte[] {0, 0, 1} });
QueryUtil.setDataTableScanFilter(
Expand Down

0 comments on commit 81d2112

Please sign in to comment.