23
23
* THE SOFTWARE.
24
24
*/
25
25
package com .iluwatar .logaggregation ;
26
-
26
+ import java . util . concurrent . BlockingQueue ;
27
27
import java .util .concurrent .ConcurrentLinkedQueue ;
28
- import java .util .concurrent .ExecutorService ;
28
+ import java .util .concurrent .LinkedBlockingQueue ;
29
+ import java .util .concurrent .ScheduledExecutorService ;
29
30
import java .util .concurrent .Executors ;
30
31
import java .util .concurrent .TimeUnit ;
31
32
import java .util .concurrent .atomic .AtomicInteger ;
33
+ import java .util .concurrent .CountDownLatch ;
34
+ import java .util .ArrayList ;
35
+ import java .util .List ;
32
36
import lombok .extern .slf4j .Slf4j ;
33
37
34
38
/**
41
45
public class LogAggregator {
42
46
43
47
private static final int BUFFER_THRESHOLD = 3 ;
48
+ private static final int FLUSH_INTERVAL_SECONDS = 5 ;
49
+ private static final int SHUTDOWN_TIMEOUT_SECONDS = 10 ;
50
+
44
51
private final CentralLogStore centralLogStore ;
45
52
private final ConcurrentLinkedQueue <LogEntry > buffer = new ConcurrentLinkedQueue <>();
46
53
private final LogLevel minLogLevel ;
47
54
private final ExecutorService executorService = Executors .newSingleThreadExecutor ();
48
55
private final AtomicInteger logCount = new AtomicInteger (0 );
56
+ private final ScheduledExecutorService scheduledExecutor = Executors .newScheduledThreadPool (1 );
57
+ private final CountDownLatch shutdownLatch = new CountDownLatch (1 );
58
+ private volatile boolean running = true ;
49
59
50
60
/**
51
61
* constructor of LogAggregator.
@@ -57,6 +67,15 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
57
67
this .centralLogStore = centralLogStore ;
58
68
this .minLogLevel = minLogLevel ;
59
69
startBufferFlusher ();
70
+ // Add shutdown hook for graceful termination
71
+ Runtime .getRuntime ().addShutdownHook (new Thread (() -> {
72
+ try {
73
+ stop ();
74
+ } catch (InterruptedException e ) {
75
+ LOGGER .warn ("Shutdown interrupted" , e );
76
+ Thread .currentThread ().interrupt ();
77
+ }
78
+ }));
60
79
}
61
80
62
81
/**
@@ -65,6 +84,11 @@ public LogAggregator(CentralLogStore centralLogStore, LogLevel minLogLevel) {
65
84
* @param logEntry The log entry to collect.
66
85
*/
67
86
public void collectLog (LogEntry logEntry ) {
87
+ if (!running ) {
88
+ LOGGER .warn ("LogAggregator is shutting down. Skipping log entry." );
89
+ return ;
90
+ }
91
+
68
92
if (logEntry .getLevel () == null || minLogLevel == null ) {
69
93
LOGGER .warn ("Log level or threshold level is null. Skipping." );
70
94
return ;
@@ -75,10 +99,17 @@ public void collectLog(LogEntry logEntry) {
75
99
return ;
76
100
}
77
101
78
- buffer .offer (logEntry );
102
+ // BlockingQueue.offer() is non-blocking and thread-safe
103
+ boolean added = buffer .offer (logEntry );
104
+ if (!added ) {
105
+ LOGGER .warn ("Failed to add log entry to buffer - queue may be full" );
106
+ return ;
107
+ }
79
108
109
+ // Check if immediate flush is needed due to threshold
80
110
if (logCount .incrementAndGet () >= BUFFER_THRESHOLD ) {
81
- flushBuffer ();
111
+ // Schedule immediate flush instead of blocking current thread
112
+ scheduledExecutor .execute (this ::flushBuffer );
82
113
}
83
114
}
84
115
@@ -88,32 +119,123 @@ public void collectLog(LogEntry logEntry) {
88
119
* @throws InterruptedException If any thread has interrupted the current thread.
89
120
*/
90
121
public void stop () throws InterruptedException {
91
- executorService .shutdownNow ();
92
- if (!executorService .awaitTermination (10 , TimeUnit .SECONDS )) {
93
- LOGGER .error ("Log aggregator did not terminate." );
122
+ LOGGER .info ("Stopping LogAggregator..." );
123
+ running = false ;
124
+
125
+ // Shutdown the scheduler gracefully
126
+ scheduledExecutor .shutdown ();
127
+
128
+ try {
129
+ // Wait for scheduled tasks to complete
130
+ if (!scheduledExecutor .awaitTermination (SHUTDOWN_TIMEOUT_SECONDS , TimeUnit .SECONDS )) {
131
+ LOGGER .warn ("Scheduler did not terminate gracefully, forcing shutdown" );
132
+ scheduledExecutor .shutdownNow ();
133
+
134
+ // Wait a bit more for tasks to respond to interruption
135
+ if (!scheduledExecutor .awaitTermination (2 , TimeUnit .SECONDS )) {
136
+ LOGGER .error ("Scheduler did not terminate after forced shutdown" );
137
+ }
138
+ }
139
+ } finally {
140
+ // Final flush of any remaining logs
141
+ flushBuffer ();
142
+ shutdownLatch .countDown ();
143
+ LOGGER .info ("LogAggregator stopped successfully" );
94
144
}
95
- flushBuffer ();
96
145
}
97
146
147
+
148
+ /**
149
+ * Waits for the LogAggregator to complete shutdown.
150
+ * Useful for testing or controlled shutdown scenarios.
151
+ *
152
+ * @throws InterruptedException If any thread has interrupted the current thread.
153
+ */
154
+ public void awaitShutdown () throws InterruptedException {
155
+ shutdownLatch .await ();
156
+ }
157
+
158
+
98
159
private void flushBuffer () {
99
- LogEntry logEntry ;
100
- while ((logEntry = buffer .poll ()) != null ) {
101
- centralLogStore .storeLog (logEntry );
102
- logCount .decrementAndGet ();
160
+ if (!running && buffer .isEmpty ()) {
161
+ return ;
162
+ }
163
+
164
+ try {
165
+ List <LogEntry > batch = new ArrayList <>();
166
+ int drained = 0 ;
167
+
168
+ // Drain up to a reasonable batch size for efficiency
169
+ LogEntry logEntry ;
170
+ while ((logEntry = buffer .poll ()) != null && drained < 100 ) {
171
+ batch .add (logEntry );
172
+ drained ++;
173
+ }
174
+
175
+ if (!batch .isEmpty ()) {
176
+ LOGGER .debug ("Flushing {} log entries to central store" , batch .size ());
177
+
178
+ // Process the batch
179
+ for (LogEntry entry : batch ) {
180
+ centralLogStore .storeLog (entry );
181
+ logCount .decrementAndGet ();
182
+ }
183
+
184
+ LOGGER .debug ("Successfully flushed {} log entries" , batch .size ());
185
+ }
186
+ } catch (Exception e ) {
187
+ LOGGER .error ("Error occurred while flushing buffer" , e );
103
188
}
104
189
}
105
190
106
- private void startBufferFlusher () {
107
- executorService .execute (
191
+ /**
192
+ * Starts the periodic buffer flusher using ScheduledExecutorService.
193
+ * This eliminates the busy-waiting loop with Thread.sleep().
194
+ */
195
+ private void startPeriodicFlusher () {
196
+ scheduledExecutor .scheduleAtFixedRate (
108
197
() -> {
109
- while (! Thread . currentThread (). isInterrupted () ) {
198
+ if ( running ) {
110
199
try {
111
- Thread .sleep (5000 ); // Flush every 5 seconds.
112
200
flushBuffer ();
113
- } catch (InterruptedException e ) {
114
- Thread . currentThread (). interrupt ( );
201
+ } catch (Exception e ) {
202
+ LOGGER . error ( "Error in periodic flush" , e );
115
203
}
116
204
}
117
- });
205
+ },
206
+ FLUSH_INTERVAL_SECONDS , // Initial delay
207
+ FLUSH_INTERVAL_SECONDS , // Period
208
+ TimeUnit .SECONDS
209
+ );
210
+
211
+ LOGGER .info ("Periodic log flusher started with interval of {} seconds" , FLUSH_INTERVAL_SECONDS );
212
+ }
213
+ /**
214
+ * Gets the current number of buffered log entries.
215
+ * Useful for monitoring and testing.
216
+ *
217
+ * @return Current buffer size
218
+ */
219
+ public int getBufferSize () {
220
+ return buffer .size ();
221
+ }
222
+
223
+ /**
224
+ * Gets the current log count.
225
+ * Useful for monitoring and testing.
226
+ *
227
+ * @return Current log count
228
+ */
229
+ public int getLogCount () {
230
+ return logCount .get ();
231
+ }
232
+
233
+ /**
234
+ * Checks if the LogAggregator is currently running.
235
+ *
236
+ * @return true if running, false if stopped or stopping
237
+ */
238
+ public boolean isRunning () {
239
+ return running ;
118
240
}
119
241
}
0 commit comments