Skip to content

Commit ca240af

Browse files
committed
IPC dead letter queue can now be deactivated
1 parent 8de36ad commit ca240af

File tree

4 files changed

+109
-6
lines changed

4 files changed

+109
-6
lines changed

src/main/java/com/github/jlangch/venice/impl/functions/IPCFunctions.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,10 +133,11 @@ public class IPCFunctions {
133133
" Defaults to `false`.|\n" +
134134
"| :write-ahead-log-compact b | If `true` compacts the write-ahead-logs at server start.¶" +
135135
" Defaults to `false`.|\n" +
136-
"| :authenticator a | An authenticator. If an authenticator is used encryption must " +
136+
"| :authenticator a | An authenticator. If an authenticator is used encryption must" +
137137
" be enabled to safely transmit users credentials!¶" +
138138
" Defaults to `nil`.|\n" +
139-
"| :dead-letter-queue-size n | The dead letter queue size.¶Defaults to 100|\n" +
139+
"| :dead-letter-queue-size n | The dead letter queue size. A size of 0 deactivates the dead" +
140+
" letter queue. ¶Defaults to 100|\n" +
140141
"| :socket-snd-buf-size n | The server socket's send buffer size.¶" +
141142
" Defaults to `-1` (use the sockets default buf size).¶" +
142143
" The size can be specified as a number like `64536`" +

src/main/java/com/github/jlangch/venice/util/ipc/ServerConfig.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -360,11 +360,12 @@ public Builder maxFunctions(final int maxFunctions) {
360360
*
361361
* <p>Defaults to 100
362362
*
363-
* @param size the dead letter queue's size.
363+
* @param size the dead letter queue's size. A size of 0 deactivates
364+
* the dead letter queue
364365
* @return this builder
365366
*/
366367
public Builder deadLetterQueueSize(final int size) {
367-
this.deadLetterQueueSize = Math.min(10_000, Math.max(10, size));
368+
this.deadLetterQueueSize = Math.min(10_000, Math.max(0, size));
368369
return this;
369370
}
370371

src/main/java/com/github/jlangch/venice/util/ipc/impl/ServerQueueManager.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package com.github.jlangch.venice.util.ipc.impl;
2323

2424
import static com.github.jlangch.venice.util.ipc.QueuePersistence.DURABLE;
25+
import static com.github.jlangch.venice.util.ipc.QueueType.CIRCULAR;
2526

2627
import java.util.HashMap;
2728
import java.util.List;
@@ -38,6 +39,7 @@
3839
import com.github.jlangch.venice.util.ipc.ServerConfig;
3940
import com.github.jlangch.venice.util.ipc.impl.dest.queue.CircularBuffer;
4041
import com.github.jlangch.venice.util.ipc.impl.dest.queue.IpcQueue;
42+
import com.github.jlangch.venice.util.ipc.impl.dest.queue.NullQueue;
4143
import com.github.jlangch.venice.util.ipc.impl.util.ServerLogger;
4244
import com.github.jlangch.venice.util.ipc.impl.wal.WalQueueManager;
4345

@@ -299,13 +301,18 @@ public void close() {
299301
}
300302

301303
private IpcQueue<Message> creatDeadLetterQueue(final int size) {
302-
final IpcQueue<Message> q = new CircularBuffer<Message>(
304+
final IpcQueue<Message> q = size > 0
305+
? new CircularBuffer<Message>(
303306
DEAD_LETTER_QUEUE_NAME,
304307
size,
305-
false);
308+
false)
309+
: new NullQueue<Message>(
310+
DEAD_LETTER_QUEUE_NAME,
311+
CIRCULAR);
306312
q.updateAcls(
307313
new HashMap<String,Acl>(),
308314
new Acl(DEAD_LETTER_QUEUE_NAME, null, AccessMode.DENY)); // admin only
315+
309316
return q;
310317
}
311318

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/* __ __ _
2+
* \ \ / /__ _ __ (_) ___ ___
3+
* \ \/ / _ \ '_ \| |/ __/ _ \
4+
* \ / __/ | | | | (_| __/
5+
* \/ \___|_| |_|_|\___\___|
6+
*
7+
*
8+
* Copyright 2017-2026 Venice
9+
*
10+
* Licensed under the Apache License, Version 2.0 (the "License");
11+
* you may not use this file except in compliance with the License.
12+
* You may obtain a copy of the License at
13+
*
14+
* http://www.apache.org/licenses/LICENSE-2.0
15+
*
16+
* Unless required by applicable law or agreed to in writing, software
17+
* distributed under the License is distributed on an "AS IS" BASIS,
18+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
19+
* See the License for the specific language governing permissions and
20+
* limitations under the License.
21+
*/
22+
package com.github.jlangch.venice.util.ipc.impl.dest.queue;
23+
24+
import java.util.concurrent.TimeUnit;
25+
26+
import com.github.jlangch.venice.util.ipc.QueueType;
27+
import com.github.jlangch.venice.util.ipc.impl.Destination;
28+
29+
30+
public class NullQueue<T> extends Destination implements IpcQueue<T> {
31+
32+
public NullQueue(final String name, final QueueType type) {
33+
super(name);
34+
this.type = type;
35+
}
36+
37+
38+
@Override
39+
public QueueType type() {
40+
return type;
41+
}
42+
43+
@Override
44+
public boolean isTemporary() {
45+
return true;
46+
}
47+
48+
@Override
49+
public boolean isDurable() {
50+
return false;
51+
}
52+
53+
@Override
54+
public boolean isEmpty() {
55+
return true;
56+
}
57+
58+
@Override
59+
public int size() {
60+
return 0;
61+
}
62+
63+
@Override
64+
public int capacity() {
65+
return 0;
66+
}
67+
68+
@Override
69+
public T poll() throws InterruptedException {
70+
return null;
71+
}
72+
73+
@Override
74+
public T poll(long timeout, TimeUnit unit) throws InterruptedException {
75+
return null;
76+
}
77+
78+
@Override
79+
public boolean offer(T item) throws InterruptedException {
80+
return false;
81+
}
82+
83+
@Override
84+
public boolean offer(T item, long timeout, TimeUnit unit) throws InterruptedException {
85+
return false;
86+
}
87+
88+
@Override
89+
public void onRemove() {
90+
}
91+
92+
93+
private final QueueType type;
94+
}

0 commit comments

Comments
 (0)