diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java index c742f829e0f..3bddcb2f2ba 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperClient.java @@ -241,9 +241,9 @@ public ZooKeeperClient build() throws IOException, KeeperException, InterruptedE // Create a watcher manager StatsLogger watcherStatsLogger = statsLogger.scope("watcher"); - ZooKeeperWatcherBase watcherManager = - null == watchers ? new ZooKeeperWatcherBase(sessionTimeoutMs, watcherStatsLogger) : - new ZooKeeperWatcherBase(sessionTimeoutMs, watchers, watcherStatsLogger); + ZooKeeperWatcherBase watcherManager = (null == watchers) + ? new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watcherStatsLogger) + : new ZooKeeperWatcherBase(sessionTimeoutMs, allowReadOnlyMode, watchers, watcherStatsLogger); ZooKeeperClient client = new ZooKeeperClient( connectString, sessionTimeoutMs, diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java index 758f079d0da..e44a5f364cd 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/zookeeper/ZooKeeperWatcherBase.java @@ -44,6 +44,7 @@ public class ZooKeeperWatcherBase implements Watcher { .getLogger(ZooKeeperWatcherBase.class); private final int zkSessionTimeOut; + private final boolean allowReadOnlyMode; private volatile CountDownLatch clientConnectLatch = new CountDownLatch(1); private final CopyOnWriteArraySet childWatchers = new CopyOnWriteArraySet(); @@ -53,18 +54,20 @@ public class ZooKeeperWatcherBase implements Watcher { private final ConcurrentHashMap eventCounters = new ConcurrentHashMap(); - public ZooKeeperWatcherBase(int zkSessionTimeOut) { - this(zkSessionTimeOut, NullStatsLogger.INSTANCE); + public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode) { + this(zkSessionTimeOut, allowReadOnlyMode, NullStatsLogger.INSTANCE); } - public ZooKeeperWatcherBase(int zkSessionTimeOut, StatsLogger statsLogger) { - this(zkSessionTimeOut, new HashSet(), statsLogger); + public ZooKeeperWatcherBase(int zkSessionTimeOut, boolean allowReadOnlyMode, StatsLogger statsLogger) { + this(zkSessionTimeOut, allowReadOnlyMode, new HashSet(), statsLogger); } public ZooKeeperWatcherBase(int zkSessionTimeOut, + boolean allowReadOnlyMode, Set childWatchers, StatsLogger statsLogger) { this.zkSessionTimeOut = zkSessionTimeOut; + this.allowReadOnlyMode = allowReadOnlyMode; this.childWatchers.addAll(childWatchers); this.statsLogger = statsLogger; } @@ -130,6 +133,14 @@ public void process(WatchedEvent event) { LOG.info("ZooKeeper client is connected now."); clientConnectLatch.countDown(); break; + case ConnectedReadOnly: + if (allowReadOnlyMode) { + LOG.info("ZooKeeper client is connected in read-only mode now."); + clientConnectLatch.countDown(); + } else { + LOG.warn("ZooKeeper client is connected in read-only mode, which is not allowed."); + } + break; case Disconnected: LOG.info("ZooKeeper client is disconnected from zookeeper now," + " but it is OK unless we received EXPIRED event."); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientZKSessionExpiry.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientZKSessionExpiry.java index b1a8bb66dd7..c72834397e0 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientZKSessionExpiry.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperClientZKSessionExpiry.java @@ -51,7 +51,7 @@ public void run() { byte[] sessionPasswd = bkc.getZkHandle().getSessionPasswd(); try { - ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000); + ZooKeeperWatcherBase watcher = new ZooKeeperWatcherBase(10000, false); ZooKeeper zk = new ZooKeeper(zkUtil.getZooKeeperConnectString(), 10000, watcher, sessionId, sessionPasswd); zk.close(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java index 45647079d84..14b71a163d2 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookKeeperTest.java @@ -1072,7 +1072,7 @@ protected ZooKeeper createZooKeeper() throws IOException { public void testZKConnectionLossForLedgerCreation() throws Exception { int zkSessionTimeOut = 10000; AtomicLong ledgerIdToInjectFailure = new AtomicLong(INVALID_LEDGERID); - ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, + ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false, NullStatsLogger.INSTANCE); MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(), zkSessionTimeOut, zooKeeperWatcherBase, ledgerIdToInjectFailure); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java index f4a9245c76d..507f143d5ca 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/replication/TestReplicationWorker.java @@ -1103,7 +1103,7 @@ public void testRWShutDownInTheCaseOfZKOperationFailures() throws Exception { * create MockZooKeeperClient instance and wait for it to be connected. */ int zkSessionTimeOut = 10000; - ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, + ZooKeeperWatcherBase zooKeeperWatcherBase = new ZooKeeperWatcherBase(zkSessionTimeOut, false, NullStatsLogger.INSTANCE); MockZooKeeperClient zkFaultInjectionWrapper = new MockZooKeeperClient(zkUtil.getZooKeeperConnectString(), zkSessionTimeOut, zooKeeperWatcherBase); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java index 08ecbd7cc12..b0e828bd5ca 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperCluster.java @@ -64,7 +64,7 @@ void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) default void expireSession(ZooKeeper zk) throws Exception { long id = zk.getSessionId(); byte[] password = zk.getSessionPasswd(); - ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000); + ZooKeeperWatcherBase w = new ZooKeeperWatcherBase(10000, false); ZooKeeper zk2 = new ZooKeeper(getZooKeeperConnectString(), zk.getSessionTimeout(), w, id, password); w.waitForConnection(); zk2.close(); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java index 3eace4a62c5..6dbf182110f 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/test/ZooKeeperClusterUtil.java @@ -139,4 +139,12 @@ public void killCluster() throws Exception { public void sleepCluster(int time, TimeUnit timeUnit, CountDownLatch l) throws InterruptedException, IOException { throw new UnsupportedOperationException("sleepServer operation is not supported for ZooKeeperClusterUtil"); } + + public void stopPeer(int id) throws Exception { + quorumUtil.shutdown(id); + } + + public void enableLocalSession(boolean localSessionEnabled) { + quorumUtil.enableLocalSession(localSessionEnabled); + } } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java index e62e5c08b21..d06892b27d7 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/zookeeper/TestZooKeeperClient.java @@ -171,7 +171,7 @@ public void process(WatchedEvent event) { }; final int timeout = 2000; ZooKeeperWatcherBase watcherManager = - new ZooKeeperWatcherBase(timeout).addChildWatcher(testWatcher); + new ZooKeeperWatcherBase(timeout, false).addChildWatcher(testWatcher); List watchers = new ArrayList(1); watchers.add(testWatcher); ZooKeeperClient client = new ShutdownZkServerClient( @@ -895,4 +895,32 @@ public void processResult(int rc, String path, Object ctx) { logger.info("Delete children from znode " + path); } + @Test + public void testAllowReadOnlyMode() throws Exception { + if (zkUtil instanceof ZooKeeperClusterUtil) { + System.setProperty("readonlymode.enabled", "true"); + ((ZooKeeperClusterUtil) zkUtil).enableLocalSession(true); + zkUtil.restartCluster(); + Thread.sleep(2000); + ((ZooKeeperClusterUtil) zkUtil).stopPeer(2); + ((ZooKeeperClusterUtil) zkUtil).stopPeer(3); + } + + try (ZooKeeperClient client = ZooKeeperClient.newBuilder() + .connectString(zkUtil.getZooKeeperConnectString()) + .sessionTimeoutMs(30000) + .watchers(new HashSet()) + .operationRetryPolicy(retryPolicy) + .allowReadOnlyMode(true) + .build()) { + Assert.assertTrue("Client failed to connect a ZooKeeper in read-only mode.", + client.getState().isConnected()); + } finally { + if (zkUtil instanceof ZooKeeperClusterUtil) { + System.setProperty("readonlymode.enabled", "false"); + ((ZooKeeperClusterUtil) zkUtil).enableLocalSession(false); + } + } + } + }