Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Accumulo 1225 attempt2 #1

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions core/src/main/java/org/apache/accumulo/core/Constants.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class Constants {
public static final String ZTABLE_COMPACT_ID = "/compact-id";
public static final String ZTABLE_COMPACT_CANCEL_ID = "/compact-cancel-id";
public static final String ZTABLE_NAMESPACE = "/namespace";
public static final String ZTABLE_CONFIG_VERSION = "/table-config-version";

public static final String ZNAMESPACES = "/namespaces";
public static final String ZNAMESPACE_NAME = "/name";
Expand Down
212 changes: 210 additions & 2 deletions core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.accumulo.core.Constants;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.Code;
import org.apache.zookeeper.WatchedEvent;
Expand All @@ -47,6 +50,12 @@
public class ZooCache {
private static final Logger log = LoggerFactory.getLogger(ZooCache.class);

public final static Pattern TABLE_SETTING_CONFIG_PATTERN =
Pattern.compile("(/accumulo/[0-9a-z-]+)(" + Constants.ZTABLES + ")(/.*)("
+ Constants.ZTABLE_CONF + ")/(table.*|tserver.*)");

public final static Pattern ZNODE_PATTERN = Pattern.compile("(/accumulo/[0-9a-z-]+)/.*");

private final ZCacheWatcher watcher = new ZCacheWatcher();
private final Watcher externalWatcher;

Expand All @@ -62,6 +71,8 @@ public class ZooCache {
private final SecureRandom secureRandom = new SecureRandom();

private volatile boolean closed = false;
private static boolean tableConfigWatcherSet = false;
private static int lastTableConfigVersion = 0;

public static class ZcStat {
private long ephemeralOwner;
Expand Down Expand Up @@ -148,15 +159,43 @@ private ZooKeeper getZooKeeper() {
}

private class ZCacheWatcher implements Watcher {

@Override
public void process(WatchedEvent event) {
if (log.isTraceEnabled()) {
log.trace("{}", event);
}
setWatcherForTableConfigVersion(event.getPath());

switch (event.getType()) {
case NodeDataChanged:
if (event.getPath().endsWith(Constants.ZTABLE_CONFIG_VERSION)) {
try {
processTableConfigurationItem(event);
} catch (KeeperException | InterruptedException e) {
log.error(
"ZCacheWatcher::processTableConfigurationItem failed to process a table configuration change");
updateAllTableConfigurations();
tableConfigWatcherSet = false;
setWatcherForTableConfigVersion(event.getPath());
}
break;
}
case NodeChildrenChanged:
if (event.getPath().endsWith("conf")
&& event.getType().equals(Event.EventType.NodeChildrenChanged)) {
try {
clear(event.getPath());
getZooKeeper().exists(event.getPath(), watcher);
if (log.isTraceEnabled()){
log.trace("NodeChildrenChanged: resetting watcher for " + event.getPath());
if (externalWatcher != null)
log.trace("external watcher with process this zpath also " + event.getPath());
}
} catch (KeeperException | InterruptedException e) {
log.error("could not reset watcher on parent node: " + event.getPath());
}
}
case NodeCreated:
case NodeDeleted:
remove(event.getPath());
Expand Down Expand Up @@ -191,6 +230,170 @@ public void process(WatchedEvent event) {
externalWatcher.process(event);
}
}

/**
* The processTableConfigurationItem function is only called when there is a NodeDataChanged
* event on the "table_config_version" znode.
*
* It resets the watcher on the "table_config_version" znode and extracts the last modified
* table config path from the table_config_version znode. If the version of the
* table_config_version znode is what this process expects to be the next one we simply emulate
* what we used to do and remove that actual table configuration property's zpath from the
* ZooCache. If the version is not what we expect to be the next then we remove all the table
* config properties from the ZooCache as specified in Accumulo Issue #1225. We gain a little
* efficiency in ZooKeeper by trying to keep track of the version of the table_config_version
* znode which Zookeeper generates on its own. It would be unwise to try to calculate the
* version on our own since many processes on different machines will be setting table
* configuration properties.
*
* @param event
* Contains the Zpath of the table_config_version znode.
*/
private synchronized void processTableConfigurationItem(WatchedEvent event)
throws KeeperException, InterruptedException {

if (event.getPath() == null || event.getPath().isEmpty())
return;

Stat versionStat = new Stat();

byte[] configToRefresh = getZooKeeper().getData(event.getPath(), watcher, versionStat);

if (configToRefresh == null) {
throw new IllegalStateException("The table-config-version znode should have data in it.");
}

if ((versionStat != null) && (versionStat.getVersion() - lastTableConfigVersion == 1)) {
lastTableConfigVersion = versionStat.getVersion();
refreshTableConfig(configToRefresh);
if (log.isTraceEnabled()) {
log.trace(
"Successfully refreshed table single table config: " + new String(configToRefresh));
}
} else {

if (log.isTraceEnabled()) {
log.trace("We have to update all table configs.");
}

if (versionStat != null) {
lastTableConfigVersion = versionStat.getVersion();
updateAllTableConfigurations();
}
}

return;
}

/**
* Remove all table configuration items for all tables from the cache.
*/
private synchronized void updateAllTableConfigurations() {

for (String cachedItem : cache.keySet()) {
if (cachedItem == null || cachedItem.isEmpty())
continue;

Matcher tableConfigMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(cachedItem);
if (tableConfigMatcher.matches()) {
remove(cachedItem);
}
}

}

/**
* Create the first watcher for the /accumulo/{InstanceID}/table-config-version znode.
*
* It only operates on the table-config-version znode. Watchers need to be set on a node if a
* watch event is ever to be triggered. Watch events are triggered by a data change, addition or
* deletion of a znode. Watches, once triggered need to be reset. (The reset of the watcher set
* here will be done in the ZCacheWatcher.processTableConfigurationItem function.) The change to
* the table-config-version znode occurs during setTableProperty and removeTableProperty calls
* in the TablePropUtil class. During setTableProperty and removeTableProperty calls, the data
* value of the table-config-version znode will also be changed in addition to the actual table
* configuration property (that will no longer be watched). Everytime the data in the
* table-config-version znode changes the internal "data version" tracked by Zookeeper will be
* automatically incremented. Accumulo does not have to compute what that version is. Zookeeper
* does that by itself. The ZooCache object will track this version and use it to maintain the
* cache. Many processes will be using it so when the "lastTableConfigVersion member" does not
* match the actual version of the table-config-version znode we just remove all the table
* configurations (in the particular ZooCache instance) as was specified in the Accumulo Issue
* #1225. What I found is that we can usually rely on events coming to the ZCacheWatcher in
* order and we only need to remove one table configuration from the ZooCache. The one that just
* got changed, added, or deleted. This should lead to far few calls to getData on Zookeeper
* during runtime. This is accomplished by setting the data of the table-config-version Znode to
* the ZooPath of the table configuration that was modified. This zPath with be removed from the
* ZooCache in the ZCacheWathcer.process(). * It will be restored in the ZooCache the next time
* ZooCache.get(String) is called for that table configuration path. This was how ZooCached
* worked when we watched all table configurations and we are just emulating it now using only
* one watched node for all the configuration items - the "table-config-version node".
*
* @param zPath
* Contains a Zpath which the will be used to extract the accumulo instance id which
* will be used to create the Zpath to the table_config_version znode so a initial
* watcher can be put on it.
*/
private synchronized void setWatcherForTableConfigVersion(String zPath) {

if (zPath == null || zPath.isEmpty())
return;

if (!tableConfigWatcherSet) {
Matcher znodeMatcher = ZNODE_PATTERN.matcher(zPath);
if (znodeMatcher.matches()) {
String pathPrefix = znodeMatcher.group(1);
try {

Stat versionStat =
getZooKeeper().exists(pathPrefix + Constants.ZTABLE_CONFIG_VERSION, watcher);
if (versionStat != null) {
lastTableConfigVersion = versionStat.getVersion();
tableConfigWatcherSet = true;
if (log.isTraceEnabled())
log.trace("Successfully set table_config watcher");
}

} catch (KeeperException ke) {
log.error("Could not set watcher on " + pathPrefix + Constants.ZTABLE_CONFIG_VERSION
+ " will retry later " + ke.getMessage());
} catch (InterruptedException ie) {
log.error("Could not set watcher on " + pathPrefix + Constants.ZTABLE_CONFIG_VERSION
+ " will retry later " + ie.getMessage());
}
}
}
}
}

/**
* Removes a ZPath that matches the TABLE_SETTING_CONFIG_PATTERN regex from the ZCache so it can
* be refreshed by a getData call to ZooKeeper when get(zpath) is called the next time the
* effected Zpath.
*
* @param configToRefresh
* the byte array obtained from the getData call to the table_config_version node which
* holds the ZPath of the last table configuration znode that was modified.
*/
private void refreshTableConfig(byte[] configToRefresh) {
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add method comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Just pushed the new code.

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

um.... I do not see a method comment here on refreshTableConfig.....

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added the method comment and made the others I wrote conform to the previously written comments in the class.

if (configToRefresh == null)
return;

String strConfigToRefresh = new String(configToRefresh);
if (strConfigToRefresh.isEmpty())
return;

Matcher tableConfigMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(strConfigToRefresh);
if (!tableConfigMatcher.matches())
return;

if (log.isTraceEnabled()) {
log.trace("NodeDataChanged called refreshTableConfig and refreshed table config: "
+ new String(configToRefresh));
}

remove(strConfigToRefresh);

}

/**
Expand Down Expand Up @@ -402,18 +605,22 @@ public byte[] run() throws KeeperException, InterruptedException {
* a special case that looks for Code.NONODE in the KeeperException, then non-existence can
* not be cached.
*/

Matcher configMatcher = TABLE_SETTING_CONFIG_PATTERN.matcher(zPath);
boolean createWatch = !configMatcher.matches();

cacheWriteLock.lock();
try {
final ZooKeeper zooKeeper = getZooKeeper();
Stat stat = zooKeeper.exists(zPath, watcher);
Stat stat = zooKeeper.exists(zPath, createWatch ? watcher : null);
byte[] data = null;
if (stat == null) {
if (log.isTraceEnabled()) {
log.trace("zookeeper did not contain {}", zPath);
}
} else {
try {
data = zooKeeper.getData(zPath, watcher, stat);
data = zooKeeper.getData(zPath, createWatch ? watcher : null, stat);
zstat = new ZcStat(stat);
} catch (KeeperException.BadVersionException | KeeperException.NoNodeException e1) {
throw new ConcurrentModificationException();
Expand Down Expand Up @@ -546,6 +753,7 @@ boolean childrenCached(String zPath) {
* path of top node
*/
public void clear(String zPath) {

Preconditions.checkState(!closed);
cacheWriteLock.lock();
try {
Expand Down
30 changes: 30 additions & 0 deletions core/src/main/java/org/apache/accumulo/fate/zookeeper/ZooUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
Expand Down Expand Up @@ -61,6 +62,10 @@ public class ZooUtil {

private static final Logger log = LoggerFactory.getLogger(ZooUtil.class);

private static String zkRoot = "";

private static StringBuilder clonedTableConfigs = new StringBuilder();

public enum NodeExistsPolicy {
SKIP, OVERWRITE, FAIL
}
Expand Down Expand Up @@ -400,6 +405,9 @@ public static boolean exists(ZooKeeperConnectionInfo info, String zPath)
public static void recursiveCopyPersistent(ZooKeeperConnectionInfo info, String source,
String destination, NodeExistsPolicy policy) throws KeeperException, InterruptedException {
Stat stat = null;
Matcher tableConfigMatcher;
boolean done = false;

if (!exists(info, source))
throw KeeperException.create(Code.NONODE, source);
if (exists(info, destination)) {
Expand All @@ -420,7 +428,18 @@ public static void recursiveCopyPersistent(ZooKeeperConnectionInfo info, String
if (stat.getEphemeralOwner() == 0) {
if (data == null)
throw KeeperException.create(Code.NONODE, source);

putPersistentData(info, destination, data, policy);

// The clone table operation doesn't use TablePropUtil.setTableProperty but we still need to
// update the table-config-version znode.
tableConfigMatcher = ZooCache.TABLE_SETTING_CONFIG_PATTERN.matcher(destination);
if (tableConfigMatcher.matches()) {
zkRoot = tableConfigMatcher.group(1);
clonedTableConfigs.append(destination);
clonedTableConfigs.append("|");
}

if (stat.getNumChildren() > 0) {
List<String> children;
final Retry retry = RETRY_FACTORY.createRetry();
Expand All @@ -442,8 +461,19 @@ public static void recursiveCopyPersistent(ZooKeeperConnectionInfo info, String
for (String child : children) {
recursiveCopyPersistent(info, source + "/" + child, destination + "/" + child, policy);
}

done = true;
}

}

if (done && !clonedTableConfigs.toString().isEmpty()) {
// update all the tableconfigs at once
putPersistentData(info, zkRoot + Constants.ZTABLE_CONFIG_VERSION,
clonedTableConfigs.toString().getBytes(), policy);
clonedTableConfigs.delete(0, clonedTableConfigs.length());
}

}

public static boolean putPrivatePersistentData(ZooKeeperConnectionInfo info, String zPath,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,8 @@ private static void initZooKeeper(Opts opts, String uuid, String instanceNamePat
zoo.putPersistentData(zkInstanceRoot, EMPTY_BYTE_ARRAY, NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLES, Constants.ZTABLES_INITIAL_ID,
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZTABLE_CONFIG_VERSION, new byte[0],
NodeExistsPolicy.FAIL);
zoo.putPersistentData(zkInstanceRoot + Constants.ZNAMESPACES, new byte[0],
NodeExistsPolicy.FAIL);
TableManager.prepareNewNamespaceState(zoo, uuid, Namespace.DEFAULT.id(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@ public void removeTable(TableId tableId) throws KeeperException, InterruptedExce
tableStateCache.remove(tableId);
zoo.recursiveDelete(zkRoot + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_STATE,
NodeMissingPolicy.SKIP);
String tableConfigPath = zkRoot + Constants.ZTABLES + "/" + tableId + Constants.ZTABLE_CONF;
// The line below removes all the watches on the table configuration items from the Zookeeper
// server
zoo.getZooKeeper().getChildren(tableConfigPath, false);
zoo.recursiveDelete(zkRoot + Constants.ZTABLES + "/" + tableId, NodeMissingPolicy.SKIP);
}
}
Expand Down
Loading