1
1
package com .luna .common .thread ;
2
2
3
- import java .util .ArrayList ;
4
3
import java .util .List ;
5
4
import java .util .concurrent .*;
6
5
7
6
import org .apache .commons .collections4 .CollectionUtils ;
8
- import org .checkerframework .checker .nullness .qual .Nullable ;
9
7
import org .slf4j .Logger ;
10
8
import org .slf4j .LoggerFactory ;
11
9
16
14
*/
17
15
public class AsyncEngineUtils {
18
16
19
- private static final Logger log = LoggerFactory .getLogger (AsyncEngineUtils .class );
17
+ private static final Logger log = LoggerFactory .getLogger (AsyncEngineUtils .class );
20
18
21
- private static final int CORE_POOL_SIZE = 100 ;
19
+ private static final int CORE_POOL_SIZE = 200 ;
22
20
23
- private static final int MAX_POOL_SIZE = 200 ;
21
+ private static final int MAX_POOL_SIZE = 200 ;
24
22
25
- private static final ExecutorService executor ;
23
+ private static final int KEEP_ALIVE_TIME = 60 * 5 ;
24
+
25
+ private static final int QUEUE_CAPACITY = 1000 ;
26
+
27
+ private static final long TIME_OUT = 300 ;
28
+
29
+ private static final int MONITOR_PERIOD = 5 ; // 监控时间间隔,单位:s
30
+
31
+ private static final ExecutorService EXECUTOR ;
32
+
33
+ private static final Runnable MONITOR_TASK = new Runnable () {
34
+ @ Override
35
+ public void run () {
36
+ try {
37
+ ThreadPoolExecutor threadPool = (ThreadPoolExecutor )EXECUTOR ;
38
+ int activeCount = threadPool .getActiveCount (); // 正在执行的任务数
39
+ long completedTaskCount = threadPool .getCompletedTaskCount (); // 已完成任务数
40
+ long totalTaskCount = threadPool .getTaskCount (); // 总任务数
41
+ int queueSize = threadPool .getQueue ().size ();
42
+ int coreSize = threadPool .getCorePoolSize ();
43
+
44
+ log .info (
45
+ "total_task:{}, active_thread:{}, queue_size:{}, completed_thread:{}, coreSize:{}" ,
46
+ totalTaskCount , activeCount , queueSize , completedTaskCount , coreSize );
47
+
48
+ } catch (Exception e ) {
49
+ log .error ("[SYSTEM-SafeGuard]Monitor thread run fail" , e );
50
+ }
51
+ }
52
+ };
26
53
27
54
static {
28
- executor = new ThreadPoolExecutor (
55
+ EXECUTOR = new ThreadPoolExecutor (
29
56
CORE_POOL_SIZE ,
30
57
MAX_POOL_SIZE ,
31
- 60 * 5L ,
58
+ KEEP_ALIVE_TIME ,
32
59
TimeUnit .SECONDS ,
33
- new SynchronousQueue <>(),
60
+ new LinkedBlockingDeque <>(QUEUE_CAPACITY ),
34
61
Executors .defaultThreadFactory (),
35
62
new ThreadPoolExecutor .CallerRunsPolicy ());
63
+
64
+ ScheduledExecutorService monitor = Executors .newSingleThreadScheduledExecutor (new NamedThreadFactory ("AsyncEngine-Monitor" , true ));
65
+ monitor .scheduleAtFixedRate (MONITOR_TASK , MONITOR_PERIOD , MONITOR_PERIOD , TimeUnit .SECONDS );
66
+
36
67
}
37
68
38
69
/**
@@ -41,11 +72,12 @@ public class AsyncEngineUtils {
41
72
* @param tasks 任务
42
73
* @return T 任务返回值
43
74
*/
75
+ @ SafeVarargs
44
76
public static <T > List <T > concurrentExecute (Callable <T >... tasks ) {
45
77
if (tasks == null || tasks .length == 0 ) {
46
78
return Lists .newArrayList ();
47
79
}
48
- return concurrentExecute (-1 , null , tasks );
80
+ return concurrentExecute (-1 , null , Lists . newArrayList ( tasks ) );
49
81
}
50
82
51
83
/**
@@ -55,12 +87,11 @@ public static <T> List<T> concurrentExecute(Callable<T>... tasks) {
55
87
* @return T 任务返回值
56
88
*/
57
89
public static <T > List <T > concurrentExecute (List <Callable <T >> tasks ) {
58
-
59
90
if (CollectionUtils .isEmpty (tasks )) {
60
91
return Lists .newArrayList ();
61
92
}
62
93
63
- return concurrentExecute (tasks . toArray ( new Callable [ tasks . size ()]) );
94
+ return concurrentExecute (- 1 , null , tasks );
64
95
}
65
96
66
97
/**
@@ -71,23 +102,25 @@ public static <T> List<T> concurrentExecute(List<Callable<T>> tasks) {
71
102
* @param tasks 任务
72
103
* @return T 任务返回值
73
104
*/
74
- public static <T > List <T > concurrentExecute (long timeout , TimeUnit unit , Callable <T >... tasks ) {
75
- if (tasks == null || tasks . length == 0 ) {
105
+ public static <T > List <T > concurrentExecute (long timeout , TimeUnit unit , List < Callable <T >> tasks ) {
106
+ if (CollectionUtils . isEmpty ( tasks ) ) {
76
107
return Lists .newArrayList ();
77
108
}
78
109
79
110
List <T > result = Lists .newArrayList ();
80
111
try {
81
- List <Future <T >> futures = timeout > 0 ? executor .invokeAll (Lists . newArrayList ( tasks ) , timeout , unit )
82
- : executor .invokeAll (Lists . newArrayList ( tasks ) );
112
+ List <Future <T >> futures = timeout > 0 ? EXECUTOR .invokeAll (tasks , timeout , unit )
113
+ : EXECUTOR .invokeAll (tasks );
83
114
for (Future <T > future : futures ) {
84
115
T t = null ;
85
116
try {
86
- t = future .get ();
117
+ t = future .get (TIME_OUT , TimeUnit . MILLISECONDS );
87
118
} catch (CancellationException e ) {
88
119
if (timeout > 0 ) {
89
120
log .error ("concurrentExecute some task timeout!" );
90
121
}
122
+ } catch (TimeoutException tt ) {
123
+ log .error ("future.get() TimeoutException " , tt );
91
124
} catch (Throwable tt ) {
92
125
log .error ("future.get() Exception " , tt );
93
126
}
@@ -108,7 +141,7 @@ public static void execute(Runnable task) {
108
141
if (task == null ) {
109
142
return ;
110
143
}
111
- executor .submit (task );
144
+ EXECUTOR .submit (task );
112
145
}
113
146
114
147
public static void main (String [] args ) {
@@ -122,4 +155,10 @@ public static void main(String[] args) {
122
155
List <Void > voids = concurrentExecute (list );
123
156
System .out .println (voids );
124
157
}
158
+
159
+ public static void destroy () {
160
+ log .warn ("start to stop thread pool" );
161
+ EXECUTOR .shutdown ();
162
+ log .warn ("finish to stop thread pool" );
163
+ }
125
164
}
0 commit comments