|
1 | 1 | package dev.rollczi.litecommands.scheduler;
|
2 | 2 |
|
| 3 | +import dev.rollczi.litecommands.shared.ThrowingRunnable; |
3 | 4 | import dev.rollczi.litecommands.shared.ThrowingSupplier;
|
| 5 | +import org.jetbrains.annotations.ApiStatus; |
4 | 6 |
|
| 7 | +import java.io.PrintWriter; |
| 8 | +import java.io.StringWriter; |
5 | 9 | import java.time.Duration;
|
6 | 10 | import java.util.concurrent.CompletableFuture;
|
7 |
| -import java.util.concurrent.ExecutorService; |
| 11 | +import java.util.concurrent.CompletionException; |
8 | 12 | import java.util.concurrent.Executors;
|
9 | 13 | import java.util.concurrent.ScheduledExecutorService;
|
10 | 14 | import java.util.concurrent.ThreadFactory;
|
11 | 15 | import java.util.concurrent.TimeUnit;
|
12 | 16 | import java.util.concurrent.atomic.AtomicInteger;
|
| 17 | +import java.util.logging.Logger; |
13 | 18 |
|
14 | 19 | public class SchedulerExecutorPoolImpl implements Scheduler {
|
15 | 20 |
|
16 |
| - private static final String MAIN_THREAD_NAME_FORMAT = "scheduler-%s-main"; |
17 |
| - private static final String ASYNC_THREAD_NAME_FORMAT = "scheduler-%s-async-%d"; |
| 21 | + private final Logger logger; |
| 22 | + private final ScheduledExecutorService mainExecutor; |
| 23 | + private final boolean closeMainExecutorOnShutdown; |
18 | 24 |
|
19 |
| - private final ThreadLocal<Boolean> isMainThread = ThreadLocal.withInitial(() -> false); |
20 |
| - |
21 |
| - private final ExecutorService mainExecutor; |
22 |
| - private final ExecutorService asyncExecutor; |
23 |
| - private final ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(Runtime.getRuntime().availableProcessors()); |
| 25 | + private final ScheduledExecutorService asyncExecutor; |
| 26 | + private final boolean closeAsyncExecutorOnShutdown; |
24 | 27 |
|
| 28 | + @Deprecated |
25 | 29 | public SchedulerExecutorPoolImpl(String name) {
|
26 | 30 | this(name, -1);
|
27 | 31 | }
|
28 | 32 |
|
| 33 | + @Deprecated |
29 | 34 | public SchedulerExecutorPoolImpl(String name, int pool) {
|
| 35 | + this.logger = Logger.getLogger("LiteCommands"); |
| 36 | + |
30 | 37 | this.mainExecutor = Executors.newSingleThreadScheduledExecutor(runnable -> {
|
31 | 38 | Thread thread = new Thread(runnable);
|
32 |
| - thread.setName(String.format(MAIN_THREAD_NAME_FORMAT, name)); |
33 |
| - |
| 39 | + thread.setName(String.format("scheduler-%s-main", name)); |
34 | 40 | return thread;
|
35 | 41 | });
|
36 |
| - |
37 |
| - this.mainExecutor.submit(() -> isMainThread.set(true)); |
| 42 | + this.closeMainExecutorOnShutdown = true; |
38 | 43 |
|
39 | 44 | AtomicInteger asyncCount = new AtomicInteger();
|
40 | 45 | ThreadFactory factory = runnable -> {
|
41 | 46 | Thread thread = new Thread(runnable);
|
42 |
| - thread.setName(String.format(ASYNC_THREAD_NAME_FORMAT, name, asyncCount.getAndIncrement())); |
| 47 | + thread.setName(String.format("scheduler-%s-async-%d", name, asyncCount.getAndIncrement())); |
43 | 48 |
|
44 | 49 | return thread;
|
45 | 50 | };
|
46 | 51 |
|
47 |
| - this.asyncExecutor = pool < 0 ? Executors.newCachedThreadPool(factory) : Executors.newFixedThreadPool(pool, factory); |
| 52 | + this.asyncExecutor = Executors.newScheduledThreadPool( |
| 53 | + Math.max(pool, Runtime.getRuntime().availableProcessors()) / 2, |
| 54 | + factory |
| 55 | + ); |
| 56 | + this.closeAsyncExecutorOnShutdown = true; |
48 | 57 | }
|
49 | 58 |
|
50 |
| - @Override |
51 |
| - public <T> CompletableFuture<T> supplyLater(SchedulerPoll type, Duration delay, ThrowingSupplier<T, Throwable> supplier) { |
52 |
| - SchedulerPoll resolve = type.resolve(SchedulerPoll.MAIN, SchedulerPoll.ASYNCHRONOUS); |
53 |
| - CompletableFuture<T> future = new CompletableFuture<>(); |
| 59 | + /** |
| 60 | + * Internal usage only. Use {@link SchedulerExecutorPoolBuilder}. |
| 61 | + */ |
| 62 | + @ApiStatus.Internal |
| 63 | + public SchedulerExecutorPoolImpl(Logger logger, ScheduledExecutorService mainExecutor, boolean closeMainExecutorOnShutdown, ScheduledExecutorService asyncExecutor, boolean closeAsyncExecutorOnShutdown) { |
| 64 | + this.logger = logger; |
| 65 | + this.mainExecutor = mainExecutor; |
| 66 | + this.closeMainExecutorOnShutdown = closeMainExecutorOnShutdown; |
| 67 | + this.asyncExecutor = asyncExecutor; |
| 68 | + this.closeAsyncExecutorOnShutdown = closeAsyncExecutorOnShutdown; |
| 69 | + } |
54 | 70 |
|
55 |
| - if (resolve.equals(SchedulerPoll.MAIN) && delay.isZero() && isMainThread.get()) { |
56 |
| - return tryRun(supplier, future); |
57 |
| - } |
| 71 | + @Override |
| 72 | + public CompletableFuture<Void> run(SchedulerPoll type, ThrowingRunnable<Throwable> runnable) { |
| 73 | + return CompletableFuture.runAsync(() -> { |
| 74 | + try { |
| 75 | + runnable.run(); |
| 76 | + } catch (Throwable e) { |
| 77 | + throw new CompletionException(e); |
| 78 | + } |
| 79 | + }, getSchedulerByPollType(type)); |
| 80 | + } |
58 | 81 |
|
59 |
| - ExecutorService executor = resolve.equals(SchedulerPoll.MAIN) ? mainExecutor : asyncExecutor; |
| 82 | + @Override |
| 83 | + public CompletableFuture<Void> runLater(SchedulerPoll type, Duration delay, ThrowingRunnable<Throwable> runnable) { |
| 84 | + CompletableFuture<Void> future = new CompletableFuture<>(); |
| 85 | + getSchedulerByPollType(type).schedule(() -> { |
| 86 | + try { |
| 87 | + runnable.run(); |
| 88 | + future.complete(null); |
| 89 | + } catch (Throwable e) { |
| 90 | + future.completeExceptionally(e); |
| 91 | + } |
| 92 | + }, delay.toMillis(), TimeUnit.MILLISECONDS); |
| 93 | + return future; |
| 94 | + } |
60 | 95 |
|
61 |
| - if (delay.isZero()) { |
62 |
| - executor.submit(() -> tryRun(supplier, future)); |
63 |
| - } |
64 |
| - else { |
65 |
| - scheduledExecutor.schedule(() -> { |
66 |
| - executor.submit(() -> tryRun(supplier, future)); |
67 |
| - }, delay.toMillis(), TimeUnit.MILLISECONDS); |
68 |
| - } |
| 96 | + @Override |
| 97 | + public <T> CompletableFuture<T> supply(SchedulerPoll type, ThrowingSupplier<T, Throwable> supplier) { |
| 98 | + return CompletableFuture.supplyAsync(() -> { |
| 99 | + try { |
| 100 | + return supplier.get(); |
| 101 | + } catch (Throwable e) { |
| 102 | + throw new CompletionException(e); |
| 103 | + } |
| 104 | + }, getSchedulerByPollType(type)); |
| 105 | + } |
69 | 106 |
|
| 107 | + @Override |
| 108 | + public <T> CompletableFuture<T> supplyLater(SchedulerPoll type, Duration delay, ThrowingSupplier<T, Throwable> supplier) { |
| 109 | + CompletableFuture<T> future = new CompletableFuture<>(); |
| 110 | + getSchedulerByPollType(type).schedule(() -> { |
| 111 | + try { |
| 112 | + future.complete(supplier.get()); |
| 113 | + } catch (Throwable e) { |
| 114 | + future.completeExceptionally(e); |
| 115 | + } |
| 116 | + }, delay.toMillis(), TimeUnit.MILLISECONDS); |
70 | 117 | return future;
|
71 | 118 | }
|
72 | 119 |
|
73 |
| - private static <T> CompletableFuture<T> tryRun(ThrowingSupplier<T, Throwable> supplier, CompletableFuture<T> future) { |
74 |
| - try { |
75 |
| - future.complete(supplier.get()); |
| 120 | + @Override |
| 121 | + public void shutdown() { |
| 122 | + if (closeMainExecutorOnShutdown) { |
| 123 | + try { |
| 124 | + mainExecutor.close(); |
| 125 | + } catch (Throwable e) { |
| 126 | + logger.severe("Error closing main executor: \n" + getStacktraceAsString(e)); |
| 127 | + } |
76 | 128 | }
|
77 |
| - catch (Throwable throwable) { |
78 |
| - future.completeExceptionally(throwable); |
| 129 | + |
| 130 | + if (closeAsyncExecutorOnShutdown) { |
| 131 | + try { |
| 132 | + asyncExecutor.close(); |
| 133 | + } catch (Throwable e) { |
| 134 | + logger.severe("Error closing async executor: \n" + getStacktraceAsString(e)); |
| 135 | + } |
79 | 136 | }
|
| 137 | + } |
80 | 138 |
|
81 |
| - return future; |
| 139 | + private ScheduledExecutorService getSchedulerByPollType(SchedulerPoll type) { |
| 140 | + return type.resolve(SchedulerPoll.MAIN, SchedulerPoll.ASYNCHRONOUS).equals(SchedulerPoll.MAIN) |
| 141 | + ? mainExecutor |
| 142 | + : asyncExecutor; |
82 | 143 | }
|
83 | 144 |
|
84 |
| - @Override |
85 |
| - public void shutdown() { |
86 |
| - mainExecutor.shutdown(); |
87 |
| - asyncExecutor.shutdown(); |
88 |
| - isMainThread.remove(); |
| 145 | + private String getStacktraceAsString(Throwable throwable) { |
| 146 | + StringWriter stringWriter = new StringWriter(); |
| 147 | + throwable.printStackTrace(new PrintWriter(stringWriter)); |
| 148 | + return stringWriter.toString(); |
89 | 149 | }
|
90 | 150 |
|
91 | 151 | }
|
0 commit comments