Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: throw reject when SingleThreadExecutor drainTo in progress and queue is empty #4488

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.bookkeeper.common.util;

import com.google.common.annotations.VisibleForTesting;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -29,6 +30,7 @@
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -54,6 +56,11 @@ public class SingleThreadExecutor extends AbstractExecutorService implements Exe
private final LongAdder tasksRejected = new LongAdder();
private final LongAdder tasksFailed = new LongAdder();

private final int maxQueueCapacity;
private static final AtomicIntegerFieldUpdater<SingleThreadExecutor> waiterCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(SingleThreadExecutor.class, "waiterCount");
private volatile int waiterCount = 0;

enum State {
Running,
Shutdown,
Expand All @@ -80,6 +87,8 @@ public SingleThreadExecutor(ThreadFactory tf, int maxQueueCapacity, boolean reje
} else {
this.queue = new GrowableMpScArrayConsumerBlockingQueue<>();
}
this.maxQueueCapacity = maxQueueCapacity;

this.runner = tf.newThread(this);
this.state = State.Running;
this.rejectExecution = rejectExecution;
Expand Down Expand Up @@ -134,6 +143,9 @@ public void run() {

private boolean safeRunTask(Runnable r) {
try {
if (maxQueueCapacity > 0) {
waiterCountUpdater.decrementAndGet(this);
}
r.run();
tasksCompleted.increment();
} catch (Throwable t) {
Expand Down Expand Up @@ -162,7 +174,10 @@ public List<Runnable> shutdownNow() {
this.state = State.Shutdown;
this.runner.interrupt();
List<Runnable> remainingTasks = new ArrayList<>();
queue.drainTo(remainingTasks);
int n = queue.drainTo(remainingTasks);
if (maxQueueCapacity > 0) {
waiterCountUpdater.addAndGet(this, -n);
}
return remainingTasks;
}

Expand Down Expand Up @@ -204,6 +219,11 @@ public long getFailedTasksCount() {

@Override
public void execute(Runnable r) {
execute(r, null);
}

@VisibleForTesting
void execute(Runnable r, List<Runnable> runnableList) {
if (state != State.Running) {
throw new RejectedExecutionException("Executor is shutting down");
}
Expand All @@ -213,18 +233,30 @@ public void execute(Runnable r) {
queue.put(r);
tasksCount.increment();
} else {
if (queue.offer(r)) {
tasksCount.increment();
int delta = r != null ? 1 : runnableList.size();
validateQueueCapacity(delta);
if (r != null ? queue.offer(r) : queue.addAll(runnableList)) {
tasksCount.add(delta);
} else {
tasksRejected.increment();
throw new ExecutorRejectedException("Executor queue is full");
reject();
}
}
} catch (InterruptedException e) {
throw new RejectedExecutionException("Executor thread was interrupted", e);
}
}

private void validateQueueCapacity(int delta) {
if (maxQueueCapacity > 0 && waiterCountUpdater.addAndGet(this, delta) > maxQueueCapacity) {
reject();
}
}

private void reject() {
tasksRejected.increment();
throw new ExecutorRejectedException("Executor queue is full");
}

public void registerMetrics(StatsLogger statsLogger) {
// Register gauges
statsLogger.scopeLabel("thread", runner.getName())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.junit.Assert.fail;

import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -116,6 +117,61 @@ public void testRejectWhenQueueIsFull() throws Exception {
assertEquals(0, ste.getFailedTasksCount());
}

@Test
public void testRejectWhenDrainToInProgressAndQueueIsEmpty() throws Exception {
@Cleanup("shutdownNow")
SingleThreadExecutor ste = new SingleThreadExecutor(THREAD_FACTORY, 10, true);

CyclicBarrier barrier = new CyclicBarrier(10);
CountDownLatch startedLatch = new CountDownLatch(1);
List<Runnable> tasks = new ArrayList<>();

for (int i = 0; i < 10; i++) {
int n = i;
tasks.add(() -> {
if (n == 0) {
startedLatch.countDown();
} else {
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
// ignore
}
}
});
}
ste.execute(null, tasks);

// Wait until the first task is done.
try {
startedLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}

// Next task should go through, because the runner thread has already pulled out the first and second items
// from the queue.
List<Runnable> nextTasks = new ArrayList<>();
nextTasks.add(() -> {
});
nextTasks.add(() -> {
});
ste.execute(null, nextTasks);

// Now the queue is really full and should reject tasks
try {
ste.execute(() -> {
});
fail("should have rejected the task");
} catch (RejectedExecutionException e) {
// Expected
}

assertEquals(12, ste.getSubmittedTasksCount());
assertEquals(1, ste.getRejectedTasksCount());
assertEquals(0, ste.getFailedTasksCount());
}

@Test
public void testBlockWhenQueueIsFull() throws Exception {
@Cleanup("shutdown")
Expand Down