Skip to content

Commit 442d6aa

Browse files
committed
improved IPC server logging
1 parent a537753 commit 442d6aa

File tree

3 files changed

+42
-10
lines changed

3 files changed

+42
-10
lines changed

src/main/java/com/github/jlangch/venice/impl/threadpool/ManagedExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,18 +57,21 @@ public final boolean isShutdown() {
5757
}
5858
}
5959

60-
public final void awaitTermination(final long timeoutMillis) {
60+
public final boolean awaitTermination(final long timeoutMillis) {
6161
synchronized(this) {
6262
if (executor != null) {
6363
try {
64-
executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
64+
return executor.awaitTermination(timeoutMillis, TimeUnit.MILLISECONDS);
6565
}
6666
catch(Exception ex) {
6767
throw new VncException(
6868
"Failed awaiting for executor termination",
6969
ex);
7070
}
7171
}
72+
else {
73+
return true;
74+
}
7275
}
7376
}
7477

src/main/java/com/github/jlangch/venice/util/ipc/TcpServer.java

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -405,15 +405,27 @@ public void start(final Function<IMessage,IMessage> handler) {
405405
@Override
406406
public void close() throws IOException {
407407
if (started.compareAndSet(true, false)) {
408-
final String msg = "Closed server on port " + port + "!";
409-
logger.info("server", msg);
408+
logger.info("server", "Server closing...");
410409

411410
// do not shutdown the thread-pools too early
412411
IO.sleep(300);
413412

414413
safeClose(server.get());
415414
server.set(null);
415+
416+
// This method does not wait for actively executing tasks to terminate.
416417
mngdExecutor.shutdownNow();
418+
419+
// wait max 1'000ms
420+
final boolean terminated = mngdExecutor.awaitTermination(1_000);
421+
422+
IO.sleep(100);
423+
424+
logger.info(
425+
"server",
426+
terminated
427+
? "Server closed."
428+
: "Server closed. Some connection are delaying shutdown confirmation.");
417429
}
418430
}
419431

@@ -539,14 +551,21 @@ public boolean existsQueue(final String queueName) {
539551
private void safeClose(final ServerSocketChannel ch) {
540552
if (ch != null) {
541553
try {
542-
// close the durable queues
543-
if (wal.isEnabled()) {
544-
wal.close(p2pQueues.values());
554+
try {
555+
// close the durable queues
556+
if (wal.isEnabled()) {
557+
wal.close(p2pQueues.values());
558+
}
559+
}
560+
catch(Exception ex) {
561+
logger.warn("server", "Error while closing queue WALs.", ex);
545562
}
546563

547564
ch.close();
548565
}
549-
catch(Exception ignore) {}
566+
catch(Exception ex) {
567+
logger.warn("server", "Error while closing server.", ex);
568+
}
550569
}
551570
}
552571

src/main/java/com/github/jlangch/venice/util/ipc/impl/TcpServerConnection.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,10 @@ private Message handleSubscribe(final Message request) {
419419
// register subscription
420420
subscriptions.addSubscriptions(request.getTopicsSet(), this);
421421

422+
logger.info(
423+
"conn-" + connectionId,
424+
String.format("Subscribed to topics: %s.", Topics.encode(request.getTopics())));
425+
422426
// acknowledge the subscription
423427
return createOkTextMessageResponse(request, "Subscribed to the topics.");
424428
}
@@ -427,6 +431,10 @@ private Message handleUnsubscribe(final Message request) {
427431
// unregister subscription
428432
subscriptions.removeSubscriptions(request.getTopicsSet(), this);
429433

434+
logger.info(
435+
"conn-" + connectionId,
436+
String.format("Unsubscribed from topics: %s.", Topics.encode(request.getTopics())));
437+
430438
// acknowledge the unsubscription
431439
return createOkTextMessageResponse(request, "Unsubscribed from the topics.");
432440
}
@@ -1024,16 +1032,18 @@ private void auditPublishError(
10241032
errorBuffer.offer(new Error(errorMsg, msg, ex));
10251033
statistics.incrementDiscardedPublishCount();
10261034
}
1027-
catch(InterruptedException ignore) {}
1035+
catch(InterruptedException ignore) { }
10281036
}
10291037

10301038
private void removeAllChannelTemporaryQueues() {
10311039
try {
10321040
final Set<String> names = tmpQueues.keySet();
10331041
names.forEach(n -> p2pQueues.remove(n));
10341042
tmpQueues.clear();
1043+
1044+
logger.info("conn-" + connectionId, "Removed all temporary queues of the connnection");
10351045
}
1036-
catch(Exception ignore) {}
1046+
catch(Exception ignore) { }
10371047
}
10381048

10391049
private long countStandardQueues() {

0 commit comments

Comments
 (0)