Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client][fix] Bookie WatchTask may be stuck #4481

Merged
merged 3 commits into from
Aug 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ public class BookKeeper implements org.apache.bookkeeper.client.api.BookKeeper {
private final BookKeeperClientStats clientStats;
private final double bookieQuarantineRatio;

// Inner high priority thread for WatchTask. Disable external use.
private final OrderedScheduler highPriorityTaskExecutor;

// whether the event loop group is one we created, or is owned by whoever
// instantiated us
boolean ownEventLoopGroup = false;
Expand Down Expand Up @@ -424,6 +427,8 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo

// initialize resources
this.scheduler = OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperClientScheduler").build();
this.highPriorityTaskExecutor =
OrderedScheduler.newSchedulerBuilder().numThreads(1).name("BookKeeperWatchTaskScheduler").build();
wenbingshen marked this conversation as resolved.
Show resolved Hide resolved
this.mainWorkerPool = OrderedExecutor.newBuilder()
.name("BookKeeperClientWorker")
.numThreads(conf.getNumWorkerThreads())
Expand All @@ -449,7 +454,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
}
this.metadataDriver.initialize(
conf,
scheduler,
highPriorityTaskExecutor,
rootStatsLogger,
Optional.ofNullable(zkc));
} catch (ConfigurationException ce) {
Expand Down Expand Up @@ -551,6 +556,7 @@ public BookKeeper(ClientConfiguration conf, ZooKeeper zk, EventLoopGroup eventLo
statsLogger = NullStatsLogger.INSTANCE;
clientStats = BookKeeperClientStats.newInstance(statsLogger);
scheduler = null;
highPriorityTaskExecutor = null;
requestTimer = null;
metadataDriver = null;
placementPolicy = null;
Expand Down Expand Up @@ -1462,6 +1468,13 @@ public void close() throws BKException, InterruptedException {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The scheduler did not shutdown cleanly");
}

// Close the watchTask scheduler
highPriorityTaskExecutor.shutdown();
if (!highPriorityTaskExecutor.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The highPriorityTaskExecutor for WatchTask did not shutdown cleanly");
wenbingshen marked this conversation as resolved.
Show resolved Hide resolved
}

mainWorkerPool.shutdown();
if (!mainWorkerPool.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("The mainWorkerPool did not shutdown cleanly");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -50,12 +52,14 @@
import org.apache.bookkeeper.client.api.WriteFlag;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.discover.BookieServiceInfo;
import org.apache.bookkeeper.net.BookieId;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.bookkeeper.test.BookKeeperClusterTestCase;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.apache.bookkeeper.util.StaticDNSResolver;
import org.apache.bookkeeper.versioning.Versioned;
import org.apache.bookkeeper.zookeeper.BoundExponentialBackoffRetryPolicy;
import org.apache.bookkeeper.zookeeper.ZooKeeperClient;
import org.apache.bookkeeper.zookeeper.ZooKeeperWatcherBase;
Expand Down Expand Up @@ -1298,4 +1302,48 @@ public void testBookieAddressResolverPassedToDNSToSwitchMapping() throws Excepti
}
}

@Test
public void testBookieWatcher() throws Exception {
ClientConfiguration conf = new ClientConfiguration();
conf.setMetadataServiceUri(zkUtil.getMetadataServiceUri());

StaticDNSResolver tested = new StaticDNSResolver();
try (BookKeeper bkc = BookKeeper
.forConfig(conf)
.dnsResolver(tested)
.build()) {
final Map<BookieId, BookieInfoReader.BookieInfo> bookieInfo = bkc.getBookieInfo();

// 1. check all bookies in client cache successfully.
bookieInfo.forEach((bookieId, info) -> {
final CompletableFuture<Versioned<BookieServiceInfo>> bookieServiceInfo = bkc.getMetadataClientDriver()
.getRegistrationClient().getBookieServiceInfo(bookieId);
assertTrue(bookieServiceInfo.isDone());
assertFalse(bookieServiceInfo.isCompletedExceptionally());
});

// 2. add a task to scheduler, blocking zk watch for bookies cache
bkc.getClientCtx().getScheduler().schedule(() -> {
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, 0, TimeUnit.MILLISECONDS);

// 3. restart one bookie, so the client should update cache by WatchTask
restartBookie(bookieInfo.keySet().iterator().next());

// 4. after restart bookie, check again for the client cache
final CompletableFuture<Versioned<BookieServiceInfo>> bookieServiceInfo =
bkc.getMetadataClientDriver().getRegistrationClient()
.getBookieServiceInfo(bookieInfo.keySet().iterator().next());
assertTrue(bookieServiceInfo.isDone());
// 5. Previously, we used scheduler, and here getting bookie from client cache would fail.
// 6. After this PR, we introduced independent internal thread pool watchTaskScheduler,
// and here it will succeed.
assertFalse(bookieServiceInfo.isCompletedExceptionally());
}
}

}