Skip to content

Commit 6febb7e

Browse files
committed
refactoring
1 parent e982846 commit 6febb7e

File tree

10 files changed

+194
-157
lines changed

10 files changed

+194
-157
lines changed

src/main/java/com/github/jlangch/venice/impl/functions/IPCFunctions.java

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,11 @@ public class IPCFunctions {
102102
" The cutoff size can be specified as a number like `1000`" +
103103
" or a number with a unit like `:1KB` or `:2MB`.¶" +
104104
" Defaults to -1 (no compression)|\n" +
105+
"| :encrypt b | If `true` encrypt the payload data of all messages exchanged" +
106+
" with this server.¶" +
107+
" The data is AES-256-GCM encrypted using a secret that is" +
108+
" created and exchanged using the Diffie-Hellman key exchange " +
109+
" algorithm.|\n\n" +
105110
"| :write-ahead-log-dir f | Provide a write-ahead-log directory to support durable queues.¶" +
106111
" Defaults to `nil`.|\n" +
107112
"| :write-ahead-log-compress b | If `true` compresses the write-ahead-log records.¶" +
@@ -164,6 +169,7 @@ public VncVal apply(final VncList args) {
164169
final VncVal maxMsgSizeVal = options.get(new VncKeyword("max-message-size"));
165170
final VncVal maxMaxQueuesVal = options.get(new VncKeyword("max-queues"));
166171
final VncVal compressCutoffSizeVal = options.get(new VncKeyword("compress-cutoff-size"));
172+
final VncVal encryptVal = options.get(new VncKeyword("encrypt"), VncBoolean.False);
167173
final VncVal walDirVal = options.get(new VncKeyword("write-ahead-log-dir"));
168174
final VncVal walCompressVal = options.get(new VncKeyword("write-ahead-log-compress"));
169175
final VncVal walCompactAtStartVal = options.get(new VncKeyword("write-ahead-log-compact"));
@@ -175,6 +181,7 @@ public VncVal apply(final VncList args) {
175181
final long maxMsgSize = convertMaxMessageSizeToLong(maxMsgSizeVal);
176182
final long maxQueues = convertMaxMessageSizeToLong(maxMaxQueuesVal);
177183
final long compressCutoffSize = convertMaxMessageSizeToLong(compressCutoffSizeVal);
184+
final boolean encrypt = Coerce.toVncBoolean(encryptVal).getValue();
178185

179186
final File walDir = walDirVal == Nil
180187
? null
@@ -233,6 +240,8 @@ public VncVal apply(final VncList args) {
233240
server.setCompressCutoffSize(compressCutoffSize);
234241
}
235242

243+
server.setEncryption(encrypt);
244+
236245
if (walDir != null) {
237246
server.enableWriteAheadLog(walDir, walCompress, walCompactAtStart);
238247
}
@@ -268,10 +277,6 @@ public VncVal apply(final VncList args) {
268277
"| port p | The server's TCP/IP port |\n" +
269278
"| host h | The server's TCP/IP host |\n\n" +
270279
"*Options:* \n\n" +
271-
"| :max-message-size n | The max size of the message payload." +
272-
" Defaults to `200MB`.¶" +
273-
" The max size can be specified as a number like `20000`" +
274-
" or a number with a unit like `:20KB` or `:20MB`|\n" +
275280
"| :compress-cutoff-size n | The compression cutoff size for payload messages.¶" +
276281
" With a negative cutoff size payload messages will not be" +
277282
" compressed. If the payload message size is greater than the" +
@@ -353,19 +358,15 @@ else if ( args.size() == 2) {
353358
final int port = Coerce.toVncLong(args.second()).getIntValue();
354359

355360
final VncHashMap options = VncHashMap.ofAll(args.slice(2));
356-
final VncVal maxMsgSizeVal = options.get(new VncKeyword("max-message-size"));
357361
final VncVal compressCutoffSizeVal = options.get(new VncKeyword("compress-cutoff-size"));
358362
final VncVal encryptVal = options.get(new VncKeyword("encrypt"), VncBoolean.False);
359363

360-
final long maxMsgSize = convertMaxMessageSizeToLong(maxMsgSizeVal);
361364
final long compressCutoffSize = convertMaxMessageSizeToLong(compressCutoffSizeVal);
362365
final boolean encrypt = Coerce.toVncBoolean(encryptVal).getValue();
363366

364-
final TcpClient client = new TcpClient(host, port, encrypt);
367+
final TcpClient client = new TcpClient(host, port);
365368

366-
if (maxMsgSize > 0) {
367-
client.setMaximumMessageSize(maxMsgSize);
368-
}
369+
client.setEncryption(encrypt);
369370

370371
if (compressCutoffSize >= 0) {
371372
client.setCompressCutoffSize(compressCutoffSize);

src/main/java/com/github/jlangch/venice/util/ipc/MessageType.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ public enum MessageType {
4040
REMOVE_QUEUE(32),
4141
STATUS_QUEUE(33),
4242

43-
DIFFIE_HELLMAN_KEY_REQUEST(40),
43+
CLIENT_CONFIG(40),
44+
45+
DIFFIE_HELLMAN_KEY_REQUEST(50),
4446

4547

4648
// responses

src/main/java/com/github/jlangch/venice/util/ipc/TcpClient.java

Lines changed: 87 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,11 @@
4646
import com.github.jlangch.venice.impl.threadpool.ManagedCachedThreadPoolExecutor;
4747
import com.github.jlangch.venice.impl.types.VncBoolean;
4848
import com.github.jlangch.venice.impl.types.VncKeyword;
49+
import com.github.jlangch.venice.impl.types.VncLong;
4950
import com.github.jlangch.venice.impl.types.VncVal;
5051
import com.github.jlangch.venice.impl.types.collections.VncMap;
5152
import com.github.jlangch.venice.impl.types.collections.VncOrderedMap;
53+
import com.github.jlangch.venice.impl.types.util.Coerce;
5254
import com.github.jlangch.venice.impl.util.CollectionUtil;
5355
import com.github.jlangch.venice.impl.util.StringUtil;
5456
import com.github.jlangch.venice.util.dh.DiffieHellmanKeys;
@@ -76,7 +78,7 @@ public class TcpClient implements Cloneable, Closeable {
7678
* @param port a port
7779
*/
7880
public TcpClient(final int port) {
79-
this(null, port, false);
81+
this(null, port);
8082
}
8183

8284
/**
@@ -91,61 +93,42 @@ public TcpClient(final int port) {
9193
* @param port a port
9294
*/
9395
public TcpClient(final String host, final int port) {
94-
this(host, port, false);
95-
}
96-
97-
/**
98-
* Create a new TcpClient connecting to a TcpServer on the local host
99-
* and port
100-
*
101-
* <p>The client is NOT thread safe!
102-
*
103-
* <p>The client must be closed after use!
104-
*
105-
* @param port a port
106-
* @param encrypt if <code>true</code> encrypt the payload data at transport
107-
* level communication between this client and the server.
108-
*/
109-
public TcpClient(final int port, final boolean encrypt) {
110-
this(null, port, encrypt);
111-
}
112-
113-
/**
114-
* Create a new TcpClient connecting to a TcpServer on the specified host
115-
* and port
116-
*
117-
* <p>The client is NOT thread safe!
118-
*
119-
* <p>The client must be closed after use!
120-
*
121-
* @param host a host
122-
* @param port a port
123-
* @param encrypt if <code>true</code> encrypt the payload data at transport
124-
* level communication between this client and the server.
125-
*/
126-
public TcpClient(final String host, final int port, final boolean encrypt) {
12796
this.host = StringUtil.isBlank(host) ? "127.0.0.1" : host;
12897
this.port = port;
129-
this.encrypt = encrypt;
13098
this.endpointId = UUID.randomUUID().toString();
13199
this.dhKeys = DiffieHellmanKeys.create();
132100
}
133101

134102

135103
@Override
136104
public Object clone() {
137-
final TcpClient client = new TcpClient(host, port, isEncrypted());
105+
final TcpClient client = new TcpClient(host, port);
106+
client.setEncryption(isEncrypted());
138107
client.setCompressCutoffSize(getCompressCutoffSize());
139-
client.setMaximumMessageSize(getMaximumMessageSize());
140-
return client;
108+
return client;
109+
}
110+
111+
/**
112+
* Set the encryption mode
113+
*
114+
* @param encrypt if <code>true</code> encrypt the payload data at transport
115+
* level communication between this client and the server.
116+
*/
117+
public void setEncryption(final boolean encrypt) {
118+
if (opened.get()) {
119+
throw new VncException(
120+
"The encryption mode cannot be set anymore "
121+
+ "once the client has been opened!");
122+
}
123+
this.encrypt.set(encrypt);
141124
}
142125

143126
/**
144127
* @return <code>true</code> if this client has transport level encryption
145128
* enabled else <code>false</code>
146129
*/
147130
public boolean isEncrypted() {
148-
return encryptor.get().isActive();
131+
return encrypt.get();
149132
}
150133

151134
/**
@@ -177,23 +160,15 @@ public long getCompressCutoffSize() {
177160
return compressor.get().cutoffSize();
178161
}
179162

180-
/**
181-
* Set the maximum message size.
182-
*
183-
* <p>Defaults to 200 MB
184-
*
185-
* @param maxSize the max message size in bytes
186-
* @return this client
187-
*/
188-
public TcpClient setMaximumMessageSize(final long maxSize) {
189-
maxMessageSize.set(Math.max(MESSAGE_LIMIT_MIN, Math.min(MESSAGE_LIMIT_MAX, maxSize)));
190-
return this;
191-
}
192-
193163
/**
194164
* @return return the client's max message size
195165
*/
196166
public long getMaximumMessageSize() {
167+
if (!opened.get()) {
168+
throw new VncException(
169+
"The max message size cannot be queried if the client has "
170+
+ "been opened and has requested the size from the server!");
171+
}
197172
return maxMessageSize.get();
198173
}
199174

@@ -237,7 +212,37 @@ public void open() {
237212
ex);
238213
}
239214

240-
if (encrypt) {
215+
// request config from server
216+
try {
217+
final IMessage response = send(createConfigRequestMessage());
218+
if (response.getResponseStatus() != ResponseStatus.OK) {
219+
throw new VncException(
220+
"Failed to get client config from server. Server answered with "
221+
+ response.getResponseStatus() + " !");
222+
}
223+
224+
// handle config
225+
final VncMap config = (VncMap)((Message)response).getVeniceData();
226+
maxMessageSize.set(
227+
Coerce.toVncLong(
228+
config.get(
229+
new VncKeyword("max-msg-size"),
230+
new VncLong(MESSAGE_LIMIT_MAX))).toJavaLong());
231+
encrypt.set(
232+
encrypt.get() // client side encrypt request
233+
|| VncBoolean.isTrue( // server side encrypt request
234+
config.get(
235+
new VncKeyword(":encrypt"),
236+
VncBoolean.False)));
237+
}
238+
catch(Exception ex) {
239+
IO.safeClose(ch);
240+
opened.set(false);
241+
channel.set(null);
242+
throw new VncException("Failed to get client config from server!", ex);
243+
}
244+
245+
if (encrypt.get()) {
241246
try {
242247
diffieHellmanKeyExchange();
243248
}
@@ -932,20 +937,27 @@ private IMessage sendAtomically(
932937
final Encryptor encryptor
933938
) {
934939
try {
940+
final boolean auditCount = msg.getType() != MessageType.DIFFIE_HELLMAN_KEY_REQUEST
941+
&& msg.getType() != MessageType.CLIENT_CONFIG;
942+
935943
// sending the request message and receiving the response
936944
// must be atomic otherwise request and response can be mixed
937945
// in multi-threaded environments
938946
if (sendSemaphore.tryAcquire(120L, TimeUnit.SECONDS)) {
939947
try {
940948
Protocol.sendMessage(ch, (Message)msg, compressor, encryptor);
941-
messageSentCount.incrementAndGet();
949+
if (auditCount) {
950+
messageSentCount.incrementAndGet();
951+
}
942952

943953
if (msg.isOneway()) {
944954
return null;
945955
}
946956
else {
947957
final Message response = Protocol.receiveMessage(ch, compressor, encryptor);
948-
messageReceiveCount.incrementAndGet();
958+
if (auditCount) {
959+
messageReceiveCount.incrementAndGet();
960+
}
949961
return response;
950962
}
951963
}
@@ -1141,6 +1153,25 @@ private static Message createQueuePollRequestMessage(
11411153
new byte[0]);
11421154
}
11431155

1156+
private static Message createConfigRequestMessage() {
1157+
return new Message(
1158+
null,
1159+
null,
1160+
MessageType.CLIENT_CONFIG,
1161+
ResponseStatus.NULL,
1162+
false,
1163+
false,
1164+
null,
1165+
null,
1166+
System.currentTimeMillis(),
1167+
Message.EXPIRES_NEVER,
1168+
Message.NO_TIMEOUT,
1169+
Topics.of("client-config"),
1170+
"text/plain",
1171+
"UTF-8",
1172+
new byte[0]);
1173+
}
1174+
11441175

11451176
private static class FutureNull implements Future<IMessage> {
11461177

@@ -1202,8 +1233,8 @@ private static byte[] toBytes(final String s, final String charset) {
12021233
private final AtomicReference<Compressor> compressor = new AtomicReference<>(Compressor.off());
12031234

12041235
// encryption
1205-
private final boolean encrypt;
12061236
private final DiffieHellmanKeys dhKeys;
1237+
private final AtomicBoolean encrypt = new AtomicBoolean(false);
12071238
private final AtomicReference<Encryptor> encryptor = new AtomicReference<>(Encryptor.off());
12081239

12091240
private final ManagedCachedThreadPoolExecutor mngdExecutor =

src/main/java/com/github/jlangch/venice/util/ipc/TcpServer.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,29 @@ public TcpServer setMaximumParallelConnections(final int count) {
9595
return this;
9696
}
9797

98+
/**
99+
* Set the encryption mode
100+
*
101+
* @param encrypt if <code>true</code> encrypt the payload data at transport
102+
* level communication between this client and the server.
103+
*/
104+
public void setEncryption(final boolean encrypt) {
105+
if (started.get()) {
106+
throw new VncException(
107+
"The encryption mode cannot be set anymore "
108+
+ "once the server has been started!");
109+
}
110+
this.encrypt.set(encrypt);
111+
}
112+
113+
/**
114+
* @return <code>true</code> if this server has transport level encryption
115+
* enabled else <code>false</code>
116+
*/
117+
public boolean isEncrypted() {
118+
return encrypt.get();
119+
}
120+
98121
/**
99122
* Set the compression cutoff size for payload messages.
100123
*
@@ -504,6 +527,7 @@ private static byte[] toBytes(final String s, final String charset) {
504527
private final AtomicReference<ServerSocketChannel> server = new AtomicReference<>();
505528
private final AtomicLong maxMessageSize = new AtomicLong(MESSAGE_LIMIT_MAX);
506529
private final AtomicLong maxQueues = new AtomicLong(QUEUES_MAX);
530+
private final AtomicBoolean encrypt = new AtomicBoolean(false);
507531
private final WalQueueManager wal = new WalQueueManager();
508532
private final int publishQueueCapacity = 50;
509533
private final ServerStatistics statistics = new ServerStatistics();

0 commit comments

Comments
 (0)