Skip to content

Commit

Permalink
Fixes apache#1225 - Created a single ZooNode that tracks changes for …
Browse files Browse the repository at this point in the history
…all table configs
  • Loading branch information
jzgithub1 committed Nov 6, 2019
1 parent 58dca69 commit 01b9562
Show file tree
Hide file tree
Showing 4 changed files with 209 additions and 3 deletions.
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Constants {
public static final String ZTABLE_COMPACT_ID = "/compact-id";
public static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
public static final String ZTABLE_NAMESPACE = "/namespace";
public static final String ZTABLE_CONFIG_VERSION = "/table-config-version";

public static final String ZNAMESPACES = "/namespaces";
public static final String ZNAMESPACE_NAME = "/name";
Expand Down
178 changes: 176 additions & 2 deletions core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.accumulo.core.Constants;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
Expand All @@ -47,6 +50,12 @@
public class ZooCache {
private static final Logger log = LoggerFactory.getLogger(ZooCache.class);

public final static Pattern TABLE_SETTING_CONFIG_PATTERN =
Pattern.compile("(/accumulo/[0-9a-z-]+)(" + Constants.ZTABLES + ")(/.*)("
+ Constants.ZTABLE_CONF + ")/(table.*|tserver.*)");

public final static Pattern ZNODE_PATTERN = Pattern.compile("(/accumulo/[0-9a-z-]+)/.*");

private final ZCacheWatcher watcher = new ZCacheWatcher();
private final Watcher externalWatcher;

Expand All @@ -62,6 +71,8 @@ public class ZooCache {
private final SecureRandom secureRandom = new SecureRandom();

private volatile boolean closed = false;
private static boolean tableConfigWatcherSet = false;
private static int lastTableConfigVersion = 0;

public static class ZcStat {
private long ephemeralOwner;
Expand Down Expand Up @@ -148,14 +159,20 @@ private ZooKeeper getZooKeeper() {
}

private class ZCacheWatcher implements Watcher {

@Override
public void process(WatchedEvent event) {
if (log.isTraceEnabled()) {
log.trace("{}", event);
}
setWatcherForTableConfigVersion(event);

switch (event.getType()) {
case NodeDataChanged:
if (event.getPath().endsWith(Constants.ZTABLE_CONFIG_VERSION)) {
processTableConfigurationItem(event);
break;
}
case NodeChildrenChanged:
case NodeCreated:
case NodeDeleted:
Expand Down Expand Up @@ -191,6 +208,158 @@ public void process(WatchedEvent event) {
externalWatcher.process(event);
}
}
/*
* The processTableConfigurationItem function is only called when there is a NodeDataChanged
* event on the "table_config_version" znode. It resets the watcher on the
* "table_config_version" znode and extracts the last modified table config path from the
* table_config_version znode. If the version of the table_config_version znode is what this
* process expects to be the next one we simply emulate what we used to do and remove that
* actual table configuration property's zpath from the ZooCache. If the version is not what we
* expect to be the next then we remove all the table config properties from the ZooCache as
* specified in Accumulo Issue #1225. We gain a little efficiency in ZooKeeper by trying to keep
* track of the version of the table_config_version znode which Zookeeper generates on its own.
* It would be unwise to try to calculate the version on our own since many processes on
* different machines will be setting table configuration properties.
*/

private synchronized void processTableConfigurationItem(WatchedEvent event) {

if (event.getPath() == null || event.getPath().isEmpty())
return;

try {
Stat versionStat = getZooKeeper().exists(event.getPath(), watcher);
if (versionStat == null) {
return;
}

byte[] configToRefresh = getZooKeeper().getData(event.getPath(), true, versionStat);

if (configToRefresh == null) {
return;
}

if ((versionStat != null) && (versionStat.getVersion() - lastTableConfigVersion == 1)) {
lastTableConfigVersion = versionStat.getVersion();
refreshTableConfig(configToRefresh);
if (log.isTraceEnabled()) {
log.trace(
"Successfully refreshed table single table config: " + new String(configToRefresh));
}
} else {

if (log.isTraceEnabled()) {
log.trace("We have to update all table configs.");
}

if (versionStat != null) {
lastTableConfigVersion = versionStat.getVersion();
updateAllTableConfigurations();
}
}

} catch (Exception e) {
log.error("Error getting data from TABLE_CONFIGS zoonode " + e.getMessage());
}

return;
}

/**
* Remove all table configuration items for all tables from the cache.
*/

private synchronized void updateAllTableConfigurations() {

for (String cachedItem : cache.keySet()) {
if (cachedItem == null || cachedItem.isEmpty())
continue;

Matcher tableConfigMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(cachedItem);
if (tableConfigMatcher.matches()) {
remove(cachedItem);
}
}

}

/***
* Create the first watcher for the /accumulo/{InstanceID/table-config-version znode. It only
* operates on the table-config-version znode. Watchers need to be set on a node if a watch
* event is ever to be triggered. Watch events are triggered by a data change, addition or
* deletion of a znode. Watches, once triggered need to be reset. (The reset of the watcher set
* here will be done in the ZCacheWatcher.processTableConfigurationItem function.) The change to
* the table-config-version znode occurs during setTableProperty and removeTableProperty calls
* in the TablePropUtil class. During setTableProperty and removeTableProperty calls, the data
* value of the table-config-version znode will also be changed in addition to the actual table
* configuration property (that will no longer be watched). Everytime the data in the
* table-config-version znode changes the internal "data version" tracked by Zookeeper will be
* automatically incremented. Accumulo does not have to compute what that version is. Zookeeper
* does that by itself. The ZooCache object will track this version and use it to maintain the
* cache. Many processes will be using it so when the "lastTableConfigVersion member" does not
* match the actual version of the table-config-version znode we just remove all the table
* configurations (in the particular ZooCache instance) as was specified in the Accumulo Issue
* #1225. What I found is that we can usually rely on events coming to the ZCacheWatcher in
* order and we only need to remove one table configuration from the ZooCache. The one that just
* got changed, added, or deleted. This should lead to far few calls to getData on Zookeeper
* during runtime. This is accomplished by setting the data of the table-config-version Znode to
* the ZooPath of the table configuration that was modified. This zPath with be removed from the
* ZooCache in the ZCacheWathcer.process(). * It will be restored in the ZooCache the next time
* ZooCache.get(String) is called for that table configuration path. This was how ZooCached
* worked when we watched all table configurations and we are just emulating it now using only
* one watched node for all the configuration items - the "table-config-version node".
*/
private synchronized void setWatcherForTableConfigVersion(WatchedEvent event) {

if (event.getPath() == null || event.getPath().isEmpty())
return;

if (!tableConfigWatcherSet) {
Matcher znodeMatcher = ZNODE_PATTERN.matcher(event.getPath());
if (znodeMatcher.matches()) {
String pathPrefix = znodeMatcher.group(1);
try {

Stat versionStat =
getZooKeeper().exists(pathPrefix + Constants.ZTABLE_CONFIG_VERSION, watcher);
if (versionStat != null) {
lastTableConfigVersion = versionStat.getVersion();
tableConfigWatcherSet = true;
if (log.isTraceEnabled())
log.trace("Successfully set table_config watcher");
}

} catch (KeeperException ke) {
log.error("Could not set watcher on " + pathPrefix + Constants.ZTABLE_CONFIG_VERSION
+ " will retry later " + ke.getMessage());
} catch (InterruptedException ie) {
log.error("Could not set watcher on " + pathPrefix + Constants.ZTABLE_CONFIG_VERSION
+ " will retry later " + ie.getMessage());
}
}
}
}
}

private void refreshTableConfig(byte[] configToRefresh) {
if (configToRefresh == null)
return;

String strConfigToRefresh = new String(configToRefresh);
if (strConfigToRefresh.isEmpty())
return;

Matcher tableConfigMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(strConfigToRefresh);
if (!tableConfigMatcher.matches())
return;

if (log.isTraceEnabled()) {
log.trace("NodeDataChanged called refreshTableConfig and refreshed table config: "
+ new String(configToRefresh));
}

remove(strConfigToRefresh);

}

/**
Expand Down Expand Up @@ -402,18 +571,22 @@ public byte[] run() throws KeeperException, InterruptedException {
* a special case that looks for Code.NONODE in the KeeperException, then non-existence can
* not be cached.
*/

Matcher configMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(zPath);
boolean createWatch = !configMatcher.matches();

cacheWriteLock.lock();
try {
final ZooKeeper zooKeeper = getZooKeeper();
Stat stat = zooKeeper.exists(zPath, watcher);
Stat stat = zooKeeper.exists(zPath, createWatch ? watcher : null);
byte[] data = null;
if (stat == null) {
if (log.isTraceEnabled()) {
log.trace("zookeeper did not contain {}", zPath);
}
} else {
try {
data = zooKeeper.getData(zPath, watcher, stat);
data = zooKeeper.getData(zPath, createWatch ? watcher : null, stat);
zstat = new ZcStat(stat);
} catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) {
throw new ConcurrentModificationException();
Expand Down Expand Up @@ -546,6 +719,7 @@ boolean childrenCached(String zPath) {
* path of top node
*/
public void clear(String zPath) {

Preconditions.checkState(!closed);
cacheWriteLock.lock();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ private static void initZooKeeper(Opts opts, String uuid, String instanceNamePat
zoo.putPersistentData(zkInstanceRoot, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID,
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_CONFIG_VERSION, new byte[0],
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZNAMESPACES, new byte[0],
NodeExistsPolicy.FAIL);
TableManager.prepareNewNamespaceState(zoo, uuid, Namespace.DEFAULT.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package org.apache.accumulo.server.util;

import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.accumulo.fate.zookeeper.ZooCache.TABLE_SETTING_CONFIG_PATTERN;

import java.util.regex.Matcher;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.Property;
Expand All @@ -26,9 +29,13 @@
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TablePropUtil {

private static final Logger log = LoggerFactory.getLogger(TablePropUtil.class);

public static boolean setTableProperty(ServerContext context, TableId tableId, String property,
String value) throws KeeperException, InterruptedException {
return setTableProperty(context.getZooReaderWriter(), context.getZooKeeperRoot(), tableId,
Expand All @@ -47,7 +54,10 @@ public static boolean setTableProperty(ZooReaderWriter zoo, String zkRoot, Table
// create the zk node for this property and set it's data to the specified value
String zPath = zkTablePath + "/" + property;
zoo.putPersistentData(zPath, value.getBytes(UTF_8), NodeExistsPolicy.OVERWRITE);

updateTableConfigTrackingZnode(zPath, zoo);
if (log.isTraceEnabled()) {
log.trace("updateTableConfigTrackingZnode called in setTableProperty for " + zPath);
}
return true;
}

Expand All @@ -61,9 +71,28 @@ public static void removeTableProperty(ServerContext context, TableId tableId, S
throws InterruptedException, KeeperException {
String zPath = getTablePath(context.getZooKeeperRoot(), tableId) + "/" + property;
context.getZooReaderWriter().recursiveDelete(zPath, NodeMissingPolicy.SKIP);
updateTableConfigTrackingZnode(zPath, context.getZooReaderWriter());
if (log.isTraceEnabled()) {
log.trace("updateTableConfigTrackingZnode called in removeTableProperty " + zPath);
}
}

private static String getTablePath(String zkRoot, TableId tableId) {
return zkRoot + Constants.ZTABLES + "/" + tableId.canonical() + Constants.ZTABLE_CONF;
}

private static void updateTableConfigTrackingZnode(String zPath, ZooReaderWriter zoo)
throws KeeperException, InterruptedException {

String pathPrefix;
Matcher configMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(zPath);
if (configMatcher.matches()) {
pathPrefix = configMatcher.group(1);
zoo.putPersistentData(pathPrefix + Constants.ZTABLE_CONFIG_VERSION, zPath.getBytes(UTF_8),
NodeExistsPolicy.OVERWRITE);

}

}

}

0 comments on commit 01b9562

Please sign in to comment.