From 3de037fdbfc901ef1ba9e66e8c50b9a8498ad797 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Mon, 26 Aug 2024 00:44:33 +0800 Subject: [PATCH] fix: throw reject when SingleThreadExecutor drainTo in progress and queue is empty --- .../common/util/SingleThreadExecutor.java | 42 ++++++++++++-- .../common/util/TestSingleThreadExecutor.java | 56 +++++++++++++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java index 3c514ebbdaf..a1a72217a2c 100644 --- a/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java +++ b/bookkeeper-common/src/main/java/org/apache/bookkeeper/common/util/SingleThreadExecutor.java @@ -18,6 +18,7 @@ package org.apache.bookkeeper.common.util; +import com.google.common.annotations.VisibleForTesting; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.util.ArrayList; import java.util.List; @@ -29,6 +30,7 @@ import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.LongAdder; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -54,6 +56,11 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe private final LongAdder tasksRejected = new LongAdder(); private final LongAdder tasksFailed = new LongAdder(); + private final int maxQueueCapacity; + private static final AtomicIntegerFieldUpdater waiterCountUpdater = + AtomicIntegerFieldUpdater.newUpdater(SingleThreadExecutor.class, "waiterCount"); + private volatile int waiterCount = 0; + enum State { Running, Shutdown, @@ -80,6 +87,8 @@ public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity, boolean reje } else { this.queue = new GrowableMpScArrayConsumerBlockingQueue<>(); } + this.maxQueueCapacity = maxQueueCapacity; + this.runner = tf.newThread(this); this.state = State.Running; this.rejectExecution = rejectExecution; @@ -134,6 +143,9 @@ public void run() { private boolean safeRunTask(Runnable r) { try { + if (maxQueueCapacity > 0) { + waiterCountUpdater.decrementAndGet(this); + } r.run(); tasksCompleted.increment(); } catch (Throwable t) { @@ -162,7 +174,10 @@ public List shutdownNow() { this.state = State.Shutdown; this.runner.interrupt(); List remainingTasks = new ArrayList<>(); - queue.drainTo(remainingTasks); + int n = queue.drainTo(remainingTasks); + if (maxQueueCapacity > 0) { + waiterCountUpdater.addAndGet(this, -n); + } return remainingTasks; } @@ -204,6 +219,11 @@ public long getFailedTasksCount() { @Override public void execute(Runnable r) { + execute(r, null); + } + + @VisibleForTesting + void execute(Runnable r, List runnableList) { if (state != State.Running) { throw new RejectedExecutionException("Executor is shutting down"); } @@ -213,11 +233,12 @@ public void execute(Runnable r) { queue.put(r); tasksCount.increment(); } else { - if (queue.offer(r)) { - tasksCount.increment(); + int delta = r != null ? 1 : runnableList.size(); + validateQueueCapacity(delta); + if (r != null ? queue.offer(r) : queue.addAll(runnableList)) { + tasksCount.add(delta); } else { - tasksRejected.increment(); - throw new ExecutorRejectedException("Executor queue is full"); + reject(); } } } catch (InterruptedException e) { @@ -225,6 +246,17 @@ public void execute(Runnable r) { } } + private void validateQueueCapacity(int delta) { + if (maxQueueCapacity > 0 && waiterCountUpdater.addAndGet(this, delta) > maxQueueCapacity) { + reject(); + } + } + + private void reject() { + tasksRejected.increment(); + throw new ExecutorRejectedException("Executor queue is full"); + } + public void registerMetrics(StatsLogger statsLogger) { // Register gauges statsLogger.scopeLabel("thread", runner.getName()) diff --git a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java index 671318de6e2..984fa60d98a 100644 --- a/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java +++ b/bookkeeper-common/src/test/java/org/apache/bookkeeper/common/util/TestSingleThreadExecutor.java @@ -24,6 +24,7 @@ import static org.junit.Assert.fail; import io.netty.util.concurrent.DefaultThreadFactory; +import java.util.ArrayList; import java.util.List; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CountDownLatch; @@ -116,6 +117,61 @@ public void testRejectWhenQueueIsFull() throws Exception { assertEquals(0, ste.getFailedTasksCount()); } + @Test + public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws Exception { + @Cleanup("shutdownNow") + SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true); + + CyclicBarrier barrier = new CyclicBarrier(10); + CountDownLatch startedLatch = new CountDownLatch(1); + List tasks = new ArrayList<>(); + + for (int i = 0; i < 10; i++) { + int n = i; + tasks.add(() -> { + if (n == 0) { + startedLatch.countDown(); + } else { + try { + barrier.await(); + } catch (InterruptedException | BrokenBarrierException e) { + // ignore + } + } + }); + } + ste.execute(null, tasks); + + // Wait until the first task is done. + try { + startedLatch.await(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + + // Next task should go through, because the runner thread has already pulled out the first and second items + // from the queue. + List nextTasks = new ArrayList<>(); + nextTasks.add(() -> { + }); + nextTasks.add(() -> { + }); + ste.execute(null, nextTasks); + + // Now the queue is really full and should reject tasks + try { + ste.execute(() -> { + }); + fail("should have rejected the task"); + } catch (RejectedExecutionException e) { + // Expected + } + + assertEquals(12, ste.getSubmittedTasksCount()); + assertEquals(1, ste.getRejectedTasksCount()); + assertEquals(0, ste.getFailedTasksCount()); + } + @Test public void testBlockWhenQueueIsFull() throws Exception { @Cleanup("shutdown")