Skip to content

Commit 04fc93a

Browse files
committed
added optional TcpServer logger
1 parent 6febb7e commit 04fc93a

File tree

6 files changed

+216
-15
lines changed

6 files changed

+216
-15
lines changed

src/main/java/com/github/jlangch/venice/impl/functions/IPCFunctions.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,7 @@ public VncVal apply(final VncList args) {
170170
final VncVal maxMaxQueuesVal = options.get(new VncKeyword("max-queues"));
171171
final VncVal compressCutoffSizeVal = options.get(new VncKeyword("compress-cutoff-size"));
172172
final VncVal encryptVal = options.get(new VncKeyword("encrypt"), VncBoolean.False);
173+
final VncVal serverLogDirVal = options.get(new VncKeyword("server-log-dir"));
173174
final VncVal walDirVal = options.get(new VncKeyword("write-ahead-log-dir"));
174175
final VncVal walCompressVal = options.get(new VncKeyword("write-ahead-log-compress"));
175176
final VncVal walCompactAtStartVal = options.get(new VncKeyword("write-ahead-log-compact"));
@@ -183,6 +184,18 @@ public VncVal apply(final VncList args) {
183184
final long compressCutoffSize = convertMaxMessageSizeToLong(compressCutoffSizeVal);
184185
final boolean encrypt = Coerce.toVncBoolean(encryptVal).getValue();
185186

187+
final File serverLogDir = serverLogDirVal == Nil
188+
? null
189+
: IOFunctions.convertToFile(
190+
walDirVal,
191+
"Function 'ipc/server' arg ':server-log-dir' must be an `io/file`");
192+
193+
if (serverLogDir != null && !serverLogDir.isDirectory() && !serverLogDir.canWrite()) {
194+
throw new VncException(
195+
"The 'server-log-dir' " + serverLogDir
196+
+ " does not exist or is not writable!");
197+
}
198+
186199
final File walDir = walDirVal == Nil
187200
? null
188201
: IOFunctions.convertToFile(
@@ -242,6 +255,10 @@ public VncVal apply(final VncList args) {
242255

243256
server.setEncryption(encrypt);
244257

258+
if (serverLogDir != null) {
259+
server.enableLogger(serverLogDir);
260+
}
261+
245262
if (walDir != null) {
246263
server.enableWriteAheadLog(walDir, walCompress, walCompactAtStart);
247264
}

src/main/java/com/github/jlangch/venice/util/ipc/TcpServer.java

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
import com.github.jlangch.venice.util.ipc.impl.TcpServerConnection;
5151
import com.github.jlangch.venice.util.ipc.impl.queue.IpcQueue;
5252
import com.github.jlangch.venice.util.ipc.impl.util.Compressor;
53+
import com.github.jlangch.venice.util.ipc.impl.util.IO;
54+
import com.github.jlangch.venice.util.ipc.impl.util.ServerLogger;
5355
import com.github.jlangch.venice.util.ipc.impl.wal.WalQueueManager;
5456

5557
// https://medium.com/coderscorner/tale-of-client-server-and-socket-a6ef54a74763
@@ -222,6 +224,23 @@ public boolean isWriteAheadLog() {
222224
return wal.isEnabled();
223225
}
224226

227+
228+
/**
229+
* Enable the server logger within the specified log directory
230+
*
231+
* @param logDir a log directory
232+
*/
233+
public void enableLogger(final File logDir) {
234+
Objects.requireNonNull(logDir);
235+
236+
if (!logDir.isDirectory()) {
237+
throw new VncException(
238+
"The server log directory '" + logDir.getAbsolutePath() + "' does not exist!");
239+
}
240+
241+
logger.enable(logDir);
242+
}
243+
225244
/**
226245
* @return the endpoint ID of this server
227246
*/
@@ -288,6 +307,8 @@ public void start(final Function<IMessage,IMessage> handler) {
288307

289308
ch.configureBlocking(true);
290309

310+
logger.info("server", "Server started on port " + port);
311+
291312
if (wal.isEnabled()) {
292313
// Preload the queues from the Write-Ahead-Log
293314
//
@@ -309,8 +330,19 @@ public void start(final Function<IMessage,IMessage> handler) {
309330
// wait for an incoming client connection
310331
final SocketChannel channel = ch.accept();
311332
channel.configureBlocking(true);
333+
334+
final long connId = connectionId.incrementAndGet();
335+
logger.info(
336+
"server",
337+
"Server accepted new connection (" + connId + ") from "
338+
+ IO.getRemoteAddress(channel));
339+
312340
final TcpServerConnection conn = new TcpServerConnection(
313-
this, channel, handler,
341+
this,
342+
channel,
343+
connId,
344+
logger,
345+
handler,
314346
maxMessageSize,
315347
maxQueues,
316348
wal,
@@ -330,18 +362,19 @@ public void start(final Function<IMessage,IMessage> handler) {
330362
});
331363
}
332364
catch(Exception ex) {
365+
final String msg = "Closed TcpServer @ 127.0.0.1 on port " + port + "!";
366+
logger.error("server", msg, ex);
333367
safeClose(ch);
334368
started.set(false);
335369
server.set(null);
336-
throw new VncException(
337-
"Closed TcpServer @ 127.0.0.1 on port " + port + "!",
338-
ex);
370+
throw new VncException(msg, ex);
339371
}
340372
}
341373
else {
342-
throw new VncException(
343-
"The TcpServer @ 127.0.0.1 on port " + port
344-
+ " has already been started!");
374+
final String msg = "The TcpServer @ 127.0.0.1 on port " + port
375+
+ " has already been started!";
376+
logger.error("server", msg);
377+
throw new VncException(msg);
345378
}
346379
}
347380

@@ -528,12 +561,16 @@ private static byte[] toBytes(final String s, final String charset) {
528561
private final AtomicLong maxMessageSize = new AtomicLong(MESSAGE_LIMIT_MAX);
529562
private final AtomicLong maxQueues = new AtomicLong(QUEUES_MAX);
530563
private final AtomicBoolean encrypt = new AtomicBoolean(false);
564+
private final AtomicLong connectionId = new AtomicLong(0);
531565
private final WalQueueManager wal = new WalQueueManager();
532566
private final int publishQueueCapacity = 50;
533567
private final ServerStatistics statistics = new ServerStatistics();
534568
private final Subscriptions subscriptions = new Subscriptions();
535569
private final Map<String, IpcQueue<Message>> p2pQueues = new ConcurrentHashMap<>();
536570

571+
// logger
572+
private final ServerLogger logger = new ServerLogger();
573+
537574
// compression
538575
private final AtomicReference<Compressor> compressor = new AtomicReference<>(Compressor.off());
539576

src/main/java/com/github/jlangch/venice/util/ipc/impl/TcpServerConnection.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import com.github.jlangch.venice.util.ipc.impl.util.IO;
5555
import com.github.jlangch.venice.util.ipc.impl.util.Json;
5656
import com.github.jlangch.venice.util.ipc.impl.util.JsonBuilder;
57+
import com.github.jlangch.venice.util.ipc.impl.util.ServerLogger;
5758
import com.github.jlangch.venice.util.ipc.impl.wal.WalQueueManager;
5859

5960

@@ -62,6 +63,8 @@ public class TcpServerConnection implements IPublisher, Runnable {
6263
public TcpServerConnection(
6364
final TcpServer server,
6465
final SocketChannel ch,
66+
final long connectionId,
67+
final ServerLogger logger,
6568
final Function<IMessage,IMessage> handler,
6669
final AtomicLong maxMessageSize,
6770
final AtomicLong maxQueues,
@@ -75,6 +78,8 @@ public TcpServerConnection(
7578
) {
7679
this.server = server;
7780
this.ch = ch;
81+
this.connectionId = connectionId;
82+
this.logger = logger;
7883
this.handler = handler;
7984
this.maxMessageSize = maxMessageSize;
8085
this.maxQueues = maxQueues;
@@ -95,6 +100,8 @@ public TcpServerConnection(
95100
@Override
96101
public void run() {
97102
try {
103+
logger.info("conn-" + connectionId, "Listening on connection from " + IO.getRemoteAddress(ch));
104+
98105
statistics.incrementConnectionCount();
99106
while(mode != State.Terminated && server.isRunning() && ch.isOpen()) {
100107
if (mode == State.Request_Response) {
@@ -120,6 +127,8 @@ else if (mode == State.Publish) {
120127
statistics.decrementConnectionCount();
121128
subscriptions.removeSubscriptions(this);
122129
IO.safeClose(ch);
130+
131+
logger.info("conn-" + connectionId, "Closed connection");
123132
}
124133
}
125134

@@ -884,6 +893,10 @@ private Message getTcpServerStatus() {
884893
.add("write-ahead-log-count", wal.isEnabled()
885894
? wal.countLogFiles()
886895
: 0 )
896+
.add("logger-enabled", logger.isEnabled())
897+
.add("logger-file", logger.getLogFile() != null
898+
? logger.getLogFile().getAbsolutePath()
899+
: "-")
887900
.add("error-queue-capacity", ERROR_QUEUE_CAPACITY)
888901
.add("publish-queue-capacity", publishQueueCapacity)
889902
// statistics
@@ -1015,6 +1028,9 @@ private static enum State { Request_Response, Publish, Terminated };
10151028

10161029
private final TcpServer server;
10171030
private final SocketChannel ch;
1031+
private final long connectionId;
1032+
private final ServerLogger logger;
1033+
10181034
private final Function<IMessage,IMessage> handler;
10191035
private final AtomicLong maxMessageSize;
10201036
private final AtomicLong maxQueues;

src/main/java/com/github/jlangch/venice/util/ipc/impl/util/IO.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
*/
2222
package com.github.jlangch.venice.util.ipc.impl.util;
2323

24+
import java.net.SocketAddress;
2425
import java.nio.channels.SocketChannel;
2526

2627

@@ -34,4 +35,13 @@ public static void safeClose(final SocketChannel ch) {
3435
catch(Exception ignore) { }
3536
}
3637
}
38+
39+
public static SocketAddress getRemoteAddress(final SocketChannel channel) {
40+
try {
41+
return channel.getRemoteAddress();
42+
}
43+
catch(Exception ex) {
44+
return null;
45+
}
46+
}
3747
}
Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/* __ __ _
2+
* \ \ / /__ _ __ (_) ___ ___
3+
* \ \/ / _ \ '_ \| |/ __/ _ \
4+
* \ / __/ | | | | (_| __/
5+
* \/ \___|_| |_|_|\___\___|
6+
*
7+
*
8+
* Copyright 2017-2025 Venice
9+
*
10+
* Licensed under the Apache License, Version 2.0 (the "License");
11+
* you may not use this file except in compliance with the License.
12+
* You may obtain a copy of the License at
13+
*
14+
* http://www.apache.org/licenses/LICENSE-2.0
15+
*
16+
* Unless required by applicable law or agreed to in writing, software
17+
* distributed under the License is distributed on an "AS IS" BASIS,
18+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
* See the License for the specific language governing permissions and
20+
* limitations under the License.
21+
*/
22+
package com.github.jlangch.venice.util.ipc.impl.util;
23+
24+
import java.io.File;
25+
import java.nio.charset.StandardCharsets;
26+
import java.nio.file.Files;
27+
import java.nio.file.StandardOpenOption;
28+
import java.time.LocalDateTime;
29+
import java.time.format.DateTimeFormatter;
30+
import java.util.Objects;
31+
32+
import com.github.jlangch.venice.impl.util.StringUtil;
33+
34+
35+
/**
36+
* A simple server logger
37+
*
38+
* <p>Use only for TcpServer logging!
39+
*/
40+
public final class ServerLogger {
41+
42+
public ServerLogger() {
43+
logFile = null;
44+
}
45+
46+
47+
48+
public void enable(final File dir) {
49+
logFile = dir == null ? null : new File(dir, "server.log");
50+
}
51+
52+
public boolean isEnabled() {
53+
return logFile != null;
54+
}
55+
56+
public File getLogFile() {
57+
return logFile;
58+
}
59+
60+
61+
public void info(final String context, final String message) {
62+
Objects.requireNonNull(message);
63+
log("INFO", context, message, null);
64+
}
65+
66+
public void warn(final String context, final String message) {
67+
Objects.requireNonNull(message);
68+
log("WARN", context, message, null);
69+
}
70+
71+
public void warn(final String context, final String message, final Exception ex) {
72+
Objects.requireNonNull(message);
73+
log("WARN", context, message, ex);
74+
}
75+
76+
public void error(final String context, final String message) {
77+
Objects.requireNonNull(message);
78+
log("ERROR", context, message, null);
79+
}
80+
81+
public void error(final String context, final String message, final Exception ex) {
82+
Objects.requireNonNull(message);
83+
log("ERROR", context, message, ex);
84+
}
85+
86+
87+
private synchronized void log(
88+
final String level,
89+
final String context,
90+
final String message,
91+
final Exception ex
92+
) {
93+
if (logFile == null) {
94+
return;
95+
}
96+
97+
final String msg = ex == null ? message : message + ". Cause: " + ex.getMessage();
98+
99+
final String logMsg = String.format(
100+
"%s|%s|%s|%s%n",
101+
LocalDateTime.now().format(dtf),
102+
level,
103+
StringUtil.trimToEmpty(context),
104+
msg);
105+
try {
106+
Files.write(
107+
logFile.toPath(),
108+
logMsg.getBytes(StandardCharsets.UTF_8),
109+
StandardOpenOption.WRITE,
110+
StandardOpenOption.APPEND,
111+
StandardOpenOption.CREATE);
112+
}
113+
catch(Exception ignore) { }
114+
}
115+
116+
117+
private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");
118+
119+
private volatile File logFile = null;
120+
}

src/main/java/com/github/jlangch/venice/util/ipc/impl/wal/WalLogger.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ private WalLogger(final File logFile) {
4545

4646

4747
public static WalLogger withinDir(final File walDir) {
48+
Objects.requireNonNull(walDir);
4849
return new WalLogger(new File(walDir, "wal.log"));
4950
}
5051

@@ -56,28 +57,28 @@ public static WalLogger asTemporary() throws IOException {
5657

5758

5859
public void info(final File walFile, final String message) {
59-
Objects.requireNonNull(message);
60-
log("INFO", walFile, message, null);
60+
Objects.requireNonNull(message);
61+
log("INFO", walFile, message, null);
6162
}
6263

6364
public void warn(final File walFile, final String message) {
64-
Objects.requireNonNull(message);
65+
Objects.requireNonNull(message);
6566
log("WARN", walFile, message, null);
6667
}
6768

6869
public void warn(final File walFile, final String message, final Exception ex) {
69-
Objects.requireNonNull(message);
70+
Objects.requireNonNull(message);
7071
log("WARN", walFile, message, ex);
7172
}
7273

7374
public void error(final File walFile, final String message) {
74-
Objects.requireNonNull(message);
75-
log("ERROR", walFile, message, null);
75+
Objects.requireNonNull(message);
76+
log("ERROR", walFile, message, null);
7677
}
7778

7879
public void error(final File walFile, final String message, final Exception ex) {
79-
Objects.requireNonNull(message);
80-
log("ERROR", walFile, message, ex);
80+
Objects.requireNonNull(message);
81+
log("ERROR", walFile, message, ex);
8182
}
8283

8384

0 commit comments

Comments
 (0)