Skip to content

Commit 980cb6f

Browse files
committed
refactoring
1 parent b21da63 commit 980cb6f

File tree

5 files changed

+69
-51
lines changed

5 files changed

+69
-51
lines changed

src/main/java/com/github/jlangch/venice/util/ipc/IMessage.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,7 +132,7 @@ public interface IMessage {
132132
LocalDateTime getExpiresAtAsLocalDateTime();
133133

134134
/**
135-
* @return the message destionation name (queue/topic/function)
135+
* @return the message destination name (queue/topic/function)
136136
*/
137137
String getDestinationName();
138138

@@ -141,6 +141,21 @@ public interface IMessage {
141141
*/
142142
String getReplyToQueueName();
143143

144+
/**
145+
* The timeout in milliseconds for destination actions at server side.
146+
*
147+
* <ul>
148+
* <li>timeout to offer the message to a queue</li>
149+
* <li>timeout to process the publishing of a message</li>
150+
* <li>timeout to call a function and get the result</li>
151+
* <li></li>
152+
* </ul>
153+
*
154+
* @return the message replyTo queue name
155+
*/
156+
long getDestinationActionTimeout();
157+
158+
144159
/**
145160
* @return the message's subject
146161
*/

src/main/java/com/github/jlangch/venice/util/ipc/impl/Message.java

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ public Message(
8181
this.timestamp = Instant.now().toEpochMilli();
8282
this.expiresAt = expiresAt < 0 ? Messages.EXPIRES_NEVER : expiresAt;
8383
this.subject = subject;
84-
this.timeout = Messages.DEFAULT_TIMEOUT;
84+
this.destinationActionTimeout = Messages.DEFAULT_TIMEOUT;
8585
this.mimetype = mimetype;
8686
this.charset = charset;
8787
this.data = data;
@@ -123,7 +123,7 @@ public Message(
123123
this.timestamp = Instant.now().toEpochMilli();
124124
this.expiresAt = expiresAt < 0 ? Messages.EXPIRES_NEVER : expiresAt;
125125
this.subject = subject;
126-
this.timeout = Messages.DEFAULT_TIMEOUT;
126+
this.destinationActionTimeout = Messages.DEFAULT_TIMEOUT;
127127
this.mimetype = mimetype;
128128
this.charset = charset;
129129
this.data = data;
@@ -141,7 +141,7 @@ public Message(
141141
final String replyToQueueName,
142142
final long timestamp,
143143
final long expiresAt,
144-
final long timeout,
144+
final long destinationActionTimeout,
145145
final String subject,
146146
final String mimetype,
147147
final String charset,
@@ -168,7 +168,9 @@ public Message(
168168
this.replyToQueueName = replyToQueueName;
169169
this.timestamp = timestamp <= 0 ? Instant.now().toEpochMilli() : timestamp;
170170
this.expiresAt = expiresAt < 0 ? Messages.EXPIRES_NEVER : expiresAt;
171-
this.timeout = timeout < 0 ? Messages.NO_TIMEOUT : timeout;
171+
this.destinationActionTimeout = destinationActionTimeout < 0
172+
? Messages.NO_TIMEOUT
173+
: destinationActionTimeout;
172174
this.subject = subject;
173175
this.mimetype = mimetype;
174176
this.charset = charset;
@@ -194,7 +196,7 @@ public Message withType(
194196
id, requestId,
195197
type, responseStatus, oneway, durable, subscriptionReply,
196198
destinationName, replyToQueueName, timestamp, expiresAt,
197-
timeout, subject, mimetype, charset, data);
199+
destinationActionTimeout, subject, mimetype, charset, data);
198200
}
199201

200202
/**
@@ -219,7 +221,7 @@ public Message withTypeAndResponseStatus(
219221
id, requestId,
220222
type, responseStatus, oneway, durable, subscriptionReply,
221223
destinationName, replyToQueueName, timestamp, expiresAt,
222-
timeout, subject, mimetype, charset, data);
224+
destinationActionTimeout, subject, mimetype, charset, data);
223225
}
224226

225227

@@ -236,7 +238,7 @@ public Message withSubscriptionReply(
236238
id, requestId,
237239
type, responseStatus, oneway, durable, subscriptionReply,
238240
destinationName, replyToQueueName, timestamp, expiresAt,
239-
timeout, subject, mimetype, charset, data);
241+
destinationActionTimeout, subject, mimetype, charset, data);
240242
}
241243

242244
@Override
@@ -289,8 +291,9 @@ public boolean hasExpired() {
289291
return expiresAt >= 0 && expiresAt < System.currentTimeMillis();
290292
}
291293

292-
public long getTimeout() {
293-
return timeout;
294+
@Override
295+
public long getDestinationActionTimeout() {
296+
return destinationActionTimeout;
294297
}
295298

296299
@Override
@@ -449,7 +452,7 @@ public String toString() {
449452
sb.append(String.format(
450453
"%s %s\n",
451454
padRight("Timeout:", 12),
452-
timeout < 0 ? "-" : String.valueOf(timeout) + "ms"));
455+
destinationActionTimeout < 0 ? "-" : String.valueOf(destinationActionTimeout) + "ms"));
453456

454457
sb.append(String.format(
455458
"%s %s\n",
@@ -491,7 +494,7 @@ public int hashCode() {
491494
result = prime * result + ((requestId == null) ? 0 : requestId.hashCode());
492495
result = prime * result + ((responseStatus == null) ? 0 : responseStatus.hashCode());
493496
result = prime * result + (subscriptionReply ? 1231 : 1237);
494-
result = prime * result + (int) (timeout ^ (timeout >>> 32));
497+
result = prime * result + (int) (destinationActionTimeout ^ (destinationActionTimeout >>> 32));
495498
result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
496499
result = prime * result + ((subject == null) ? 0 : subject.hashCode());
497500
result = prime * result + ((type == null) ? 0 : type.hashCode());
@@ -549,7 +552,7 @@ public boolean equals(Object obj) {
549552
return false;
550553
if (subscriptionReply != other.subscriptionReply)
551554
return false;
552-
if (timeout != other.timeout)
555+
if (destinationActionTimeout != other.destinationActionTimeout)
553556
return false;
554557
if (timestamp != other.timestamp)
555558
return false;
@@ -647,7 +650,7 @@ public static void validateQueueName(final String name) {
647650
private final String replyToQueueName; // used for offer/poll messages
648651
private final long timestamp;
649652
private final long expiresAt;
650-
private final long timeout;
653+
private final long destinationActionTimeout;
651654
private final String subject;
652655
private final String mimetype;
653656
private final String charset;

src/main/java/com/github/jlangch/venice/util/ipc/impl/Messages.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public static Message testMessage(final byte[] payload, final boolean oneway) {
5656
// Message timeout
5757
public static final long EXPIRES_NEVER = -1L;
5858
public static final long NO_TIMEOUT = -1L;
59-
public static final long DEFAULT_TIMEOUT = 300L; // 300ms
59+
public static final long DEFAULT_TIMEOUT = 500L; // 500ms
6060
public static final long ZERO_TIMEOUT = 0L; // 0ms
6161

6262
// Queues

src/main/java/com/github/jlangch/venice/util/ipc/impl/conn/ServerConnection.java

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -201,10 +201,10 @@ public void publish(final Message msg) {
201201
// to this channels's client.
202202
// The publish queue is blocking to not get overrun. To prevent
203203
// a backlash if the queue is full, the message will be discarded!
204-
final long timeout = pubMsg.getTimeout();
205-
final boolean ok = timeout < 0L
204+
final long timeoutMillis = pubMsg.getDestinationActionTimeout();
205+
final boolean ok = timeoutMillis <= 0L
206206
? publishQueue.offer(pubMsg)
207-
: publishQueue.offer(pubMsg, timeout, TimeUnit.SECONDS);
207+
: publishQueue.offer(pubMsg, timeoutMillis, TimeUnit.MILLISECONDS);
208208
if (!ok) {
209209
throw new RuntimeException("Publish failure!");
210210
}
@@ -534,11 +534,11 @@ private Message handleOfferToQueue(final Message request) {
534534

535535
// convert message type from OFFER to REQUEST
536536
final Message msg = request.withType(REQUEST, request.isOneway());
537-
final long timeout = msg.getTimeout();
537+
final long timeoutMillis = msg.getDestinationActionTimeout();
538538

539-
final boolean ok = timeout < 0
539+
final boolean ok = timeoutMillis <= 0
540540
? queue.offer(msg)
541-
: queue.offer(msg, timeout, TimeUnit.MILLISECONDS);
541+
: queue.offer(msg, timeoutMillis, TimeUnit.MILLISECONDS);
542542
if (ok) {
543543
return createOkTextResponse(
544544
request,
@@ -572,7 +572,7 @@ private Message handlePollFromQueue(final Message request) {
572572

573573
final String queueName = request.getDestinationName();
574574
try {
575-
final long timeout = request.getTimeout();
575+
final long timeoutMillis = request.getDestinationActionTimeout();
576576
final IpcQueue<Message> queue = queueManager.getQueue(queueName);
577577
if (queue == null) {
578578
return createQueueNotFoundResponse(request);
@@ -589,9 +589,9 @@ private Message handlePollFromQueue(final Message request) {
589589
}
590590

591591
while(true) {
592-
final Message msg = timeout < 0
592+
final Message msg = timeoutMillis <= 0
593593
? queue.poll()
594-
: queue.poll(timeout, TimeUnit.MILLISECONDS);
594+
: queue.poll(timeoutMillis, TimeUnit.MILLISECONDS);
595595
if (msg == null) {
596596
return createTextResponse(
597597
request,

src/main/java/com/github/jlangch/venice/util/ipc/impl/protocol/PayloadMetaData.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ public PayloadMetaData(final Message msg) {
4444
msg.getType(),
4545
msg.getTimestamp(),
4646
msg.getExpiresAt(),
47-
msg.getTimeout(),
47+
msg.getDestinationActionTimeout(),
4848
msg.getResponseStatus(),
4949
msg.getRequestId(),
5050
msg.getDestinationName(),
@@ -62,7 +62,7 @@ public PayloadMetaData(
6262
final MessageType type,
6363
final long timestamp,
6464
final long expiresAt,
65-
final long timeout,
65+
final long destinationActionTimeout,
6666
final ResponseStatus responseStatus,
6767
final String requestId,
6868
final String destinationName,
@@ -84,7 +84,7 @@ public PayloadMetaData(
8484
this.type = type;
8585
this.timestamp = timestamp;
8686
this.expiresAt = expiresAt;
87-
this.timeout = timeout;
87+
this.destinationActionTimeout = destinationActionTimeout;
8888
this.responseStatus = responseStatus;
8989
this.requestId = requestId;
9090
this.destinationName = destinationName;
@@ -108,7 +108,7 @@ public Message toMessage(final byte[] payloadData) {
108108
replyToQueueName,
109109
timestamp,
110110
expiresAt,
111-
timeout,
111+
destinationActionTimeout,
112112
subject,
113113
mimetype,
114114
charset,
@@ -143,8 +143,8 @@ public long getExpiresAt() {
143143
return expiresAt;
144144
}
145145

146-
public long getTimeout() {
147-
return timeout;
146+
public long getDestinationActionTimeout() {
147+
return destinationActionTimeout;
148148
}
149149

150150
public ResponseStatus getResponseStatus() {
@@ -193,7 +193,7 @@ public int hashCode() {
193193
result = prime * result + ((requestId == null) ? 0 : requestId.hashCode());
194194
result = prime * result + ((responseStatus == null) ? 0 : responseStatus.hashCode());
195195
result = prime * result + (subscriptionReply ? 1231 : 1237);
196-
result = prime * result + (int) (timeout ^ (timeout >>> 32));
196+
result = prime * result + (int) (destinationActionTimeout ^ (destinationActionTimeout >>> 32));
197197
result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
198198
result = prime * result + ((subject == null) ? 0 : subject.hashCode());
199199
result = prime * result + ((type == null) ? 0 : type.hashCode());
@@ -249,7 +249,7 @@ public boolean equals(Object obj) {
249249
return false;
250250
if (subscriptionReply != other.subscriptionReply)
251251
return false;
252-
if (timeout != other.timeout)
252+
if (destinationActionTimeout != other.destinationActionTimeout)
253253
return false;
254254
if (timestamp != other.timestamp)
255255
return false;
@@ -300,7 +300,7 @@ public static byte[] encode(final PayloadMetaData data) {
300300
buf.putShort(_type);
301301
buf.putLong(data.timestamp);
302302
buf.putLong(data.expiresAt);
303-
buf.putLong(data.timeout);
303+
buf.putLong(data.destinationActionTimeout);
304304
buf.putShort(_responseStatus);
305305
putString(buf, _requestId);
306306
putString(buf, _destinationName);
@@ -319,22 +319,22 @@ public static PayloadMetaData decode(final byte[] data) {
319319

320320
final ByteBuffer buf = ByteBuffer.wrap(data);
321321

322-
final byte _oneway = buf.get();
323-
final byte _durable = buf.get();
324-
final byte _subscriptionReply = buf.get();
325-
final short _type = buf.getShort();
326-
final long _timestamp = buf.getLong();
327-
final long _expiresAt = buf.getLong();
328-
final long _timeout = buf.getLong();
329-
final short _responseStatus = buf.getShort();
330-
final byte[] _requestId = getString(buf);
331-
final byte[] _queueOrTopicName = getString(buf);
332-
final byte[] _replyToQueueName = getString(buf);
333-
final byte[] _subject = getString(buf);
334-
final byte[] _mimetype = getString(buf);
335-
final byte[] _charset = getString(buf);
336-
final long _idMostSignificantBits = buf.getLong();
337-
final long _idLeastSignificantBits = buf.getLong();
322+
final byte _oneway = buf.get();
323+
final byte _durable = buf.get();
324+
final byte _subscriptionReply = buf.get();
325+
final short _type = buf.getShort();
326+
final long _timestamp = buf.getLong();
327+
final long _expiresAt = buf.getLong();
328+
final long _destinationActionTimeout = buf.getLong();
329+
final short _responseStatus = buf.getShort();
330+
final byte[] _requestId = getString(buf);
331+
final byte[] _queueOrTopicName = getString(buf);
332+
final byte[] _replyToQueueName = getString(buf);
333+
final byte[] _subject = getString(buf);
334+
final byte[] _mimetype = getString(buf);
335+
final byte[] _charset = getString(buf);
336+
final long _idMostSignificantBits = buf.getLong();
337+
final long _idLeastSignificantBits = buf.getLong();
338338

339339
return new PayloadMetaData(
340340
decodeBoolean(_oneway),
@@ -343,7 +343,7 @@ public static PayloadMetaData decode(final byte[] data) {
343343
decodeMessageType(_type),
344344
_timestamp,
345345
_expiresAt,
346-
_timeout,
346+
_destinationActionTimeout,
347347
decodeResponseStatus(_responseStatus),
348348
decodeStringTrimToNull(_requestId),
349349
decodeStringTrimToNull(_queueOrTopicName),
@@ -434,7 +434,7 @@ private static byte[] getString(final ByteBuffer b) {
434434
private final MessageType type;
435435
private final long timestamp;
436436
private final long expiresAt;
437-
private final long timeout;
437+
private final long destinationActionTimeout;
438438
private final ResponseStatus responseStatus;
439439
private final String requestId;
440440
private final String destinationName;

0 commit comments

Comments
 (0)