Skip to content

Commit

Permalink
[client][fix] Bookie WatchTask may be stuck (#4481)
Browse files Browse the repository at this point in the history
* Add inner thread for WatchTask

* fix checkstyle

* rename watchTaskScheduler to highPriorityTaskExecutor
  • Loading branch information
wenbingshen committed Aug 17, 2024
1 parent 8c7dea6 commit a569a49
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
private final BookKeeperClientStats clientStats;
private final double bookieQuarantineRatio;

// Inner high priority thread for WatchTask. Disable external use.
private final OrderedScheduler highPriorityTaskExecutor;

// whether the event loop group is one we created, or is owned by whoever
// instantiated us
boolean ownEventLoopGroup = false;
Expand Down Expand Up @@ -424,6 +427,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo

// initialize resources
this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
this.highPriorityTaskExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperWatchTaskScheduler").build();
this.mainWorkerPool = OrderedExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
Expand All @@ -449,7 +454,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
}
this.metadataDriver.initialize(
conf,
scheduler,
highPriorityTaskExecutor,
rootStatsLogger,
Optional.ofNullable(zkc));
} catch (ConfigurationException ce) {
Expand Down Expand Up @@ -551,6 +556,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
statsLogger = NullStatsLogger.INSTANCE;
clientStats = BookKeeperClientStats.newInstance(statsLogger);
scheduler = null;
highPriorityTaskExecutor = null;
requestTimer = null;
metadataDriver = null;
placementPolicy = null;
Expand Down Expand Up @@ -1462,6 +1468,13 @@ public void close() throws BKException, InterruptedException {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The scheduler did not shutdown cleanly");
}

// Close the watchTask scheduler
highPriorityTaskExecutor.shutdown();
if (!highPriorityTaskExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The highPriorityTaskExecutor for WatchTask did not shutdown cleanly");
}

mainWorkerPool.shutdown();
if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The mainWorkerPool did not shutdown cleanly");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -50,12 +52,14 @@
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
Expand Down Expand Up @@ -1298,4 +1302,48 @@ public void testBookieAddressResolverPassedToDNSToSwitchMapping() throws Excepti
}
}

@Test
public void testBookieWatcher() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

StaticDNSResolver tested = new StaticDNSResolver();
try (BookKeeper bkc = BookKeeper
.forConfig(conf)
.dnsResolver(tested)
.build()) {
final Map<BookieId, BookieInfoReader.BookieInfo> bookieInfo = bkc.getBookieInfo();

// 1. check all bookies in client cache successfully.
bookieInfo.forEach((bookieId, info) -> {
final CompletableFuture<Versioned<BookieServiceInfo>> bookieServiceInfo = bkc.getMetadataClientDriver()
.getRegistrationClient().getBookieServiceInfo(bookieId);
assertTrue(bookieServiceInfo.isDone());
assertFalse(bookieServiceInfo.isCompletedExceptionally());
});

// 2. add a task to scheduler, blocking zk watch for bookies cache
bkc.getClientCtx().getScheduler().schedule(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, TimeUnit.MILLISECONDS);

// 3. restart one bookie, so the client should update cache by WatchTask
restartBookie(bookieInfo.keySet().iterator().next());

// 4. after restart bookie, check again for the client cache
final CompletableFuture<Versioned<BookieServiceInfo>> bookieServiceInfo =
bkc.getMetadataClientDriver().getRegistrationClient()
.getBookieServiceInfo(bookieInfo.keySet().iterator().next());
assertTrue(bookieServiceInfo.isDone());
// 5. Previously, we used scheduler, and here getting bookie from client cache would fail.
// 6. After this PR, we introduced independent internal thread pool watchTaskScheduler,
// and here it will succeed.
assertFalse(bookieServiceInfo.isCompletedExceptionally());
}
}

}

0 comments on commit a569a49

Please sign in to comment.