Skip to content

Commit 4387bed

Browse files
Merge pull request #939 from fbacchella/simple_mailbox
Improved concurrency and thread-safeness in Mailbox
2 parents e0470c8 + ff8acc9 commit 4387bed

File tree

4 files changed

+33
-123
lines changed

4 files changed

+33
-123
lines changed

src/main/java/zmq/Mailbox.java

Lines changed: 23 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,20 @@
22

33
import java.io.IOException;
44
import java.nio.channels.SelectableChannel;
5-
import java.util.concurrent.locks.Lock;
6-
import java.util.concurrent.locks.ReentrantLock;
5+
import java.util.Deque;
6+
import java.util.concurrent.ConcurrentLinkedDeque;
77

8-
import zmq.pipe.YPipe;
98
import zmq.util.Errno;
109

11-
public final class Mailbox implements IMailbox
10+
public class Mailbox implements IMailbox
1211
{
1312
// The pipe to store actual commands.
14-
private final YPipe<Command> cpipe;
13+
private final Deque<Command> cpipe;
1514

1615
// Signaler to pass signals from writer thread to reader thread.
16+
// kept it although a ConcurrentLinkedDeque, because the signaler channel is used in many places.
1717
private final Signaler signaler;
1818

19-
// There's only one thread receiving from the mailbox, but there
20-
// is arbitrary number of threads sending. Given that ypipe requires
21-
// synchronized access on both of its endpoints, we have to synchronize
22-
// the sending side.
23-
private final Lock sync;
24-
25-
// True if the underlying pipe is active, i.e. when we are allowed to
26-
// read commands from it.
27-
private boolean active;
28-
2919
// mailbox name, for better debugging
3020
private final String name;
3121

@@ -34,18 +24,9 @@ public final class Mailbox implements IMailbox
3424
public Mailbox(Ctx ctx, String name, int tid)
3525
{
3626
this.errno = ctx.errno();
37-
cpipe = new YPipe<>(Config.COMMAND_PIPE_GRANULARITY.getValue());
38-
sync = new ReentrantLock();
27+
cpipe = new ConcurrentLinkedDeque<>();
3928
signaler = new Signaler(ctx, tid, errno);
4029

41-
// Get the pipe into passive state. That way, if the users starts by
42-
// polling on the associated file descriptor it will get woken up when
43-
// new command is posted.
44-
45-
Command cmd = cpipe.read();
46-
assert (cmd == null);
47-
active = false;
48-
4930
this.name = name;
5031
}
5132

@@ -57,69 +38,39 @@ public SelectableChannel getFd()
5738
@Override
5839
public void send(final Command cmd)
5940
{
60-
boolean ok = false;
61-
sync.lock();
62-
try {
63-
cpipe.write(cmd, false);
64-
ok = cpipe.flush();
65-
}
66-
finally {
67-
sync.unlock();
68-
}
69-
70-
if (!ok) {
71-
signaler.send();
72-
}
41+
cpipe.addLast(cmd);
42+
signaler.send();
7343
}
7444

7545
@Override
7646
public Command recv(long timeout)
7747
{
78-
Command cmd;
79-
// Try to get the command straight away.
80-
if (active) {
81-
cmd = cpipe.read();
82-
if (cmd != null) {
83-
return cmd;
48+
Command cmd = cpipe.pollFirst();
49+
while (cmd == null) {
50+
// Wait for signal from the command sender.
51+
boolean rc = signaler.waitEvent(timeout);
52+
if (!rc) {
53+
assert (errno.get() == ZError.EAGAIN || errno.get() == ZError.EINTR) : errno.get();
54+
break;
8455
}
8556

86-
// If there are no more commands available, switch into passive state.
87-
active = false;
88-
}
89-
90-
// Wait for signal from the command sender.
91-
boolean rc = signaler.waitEvent(timeout);
92-
if (!rc) {
93-
assert (errno.get() == ZError.EAGAIN || errno.get() == ZError.EINTR) : errno.get();
94-
return null;
95-
}
57+
// Receive the signal.
58+
signaler.recv();
59+
if (errno.get() == ZError.EINTR) {
60+
break;
61+
}
9662

97-
// Receive the signal.
98-
signaler.recv();
99-
if (errno.get() == ZError.EINTR) {
100-
return null;
63+
// Get a command.
64+
// Another thread may already fetch the command, so loop on it
65+
cmd = cpipe.pollFirst();
10166
}
10267

103-
// Switch into active state.
104-
active = true;
105-
106-
// Get a command.
107-
cmd = cpipe.read();
108-
assert (cmd != null) : "command shall never be null when read";
109-
11068
return cmd;
11169
}
11270

11371
@Override
11472
public void close() throws IOException
11573
{
116-
// TODO: Retrieve and deallocate commands inside the cpipe.
117-
118-
// Work around problem that other threads might still be in our
119-
// send() method, by waiting on the mutex before disappearing.
120-
sync.lock();
121-
sync.unlock();
122-
12374
signaler.close();
12475
}
12576

src/main/java/zmq/MailboxSafe.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import java.util.concurrent.locks.ReentrantLock;
1010
import java.util.concurrent.locks.Condition;
1111

12+
@Deprecated
1213
public class MailboxSafe implements IMailbox
1314
{
1415
// The pipe to store actual commands.

src/main/java/zmq/Signaler.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,8 @@ private interface IoOperation<O>
3030
private final Pipe.SinkChannel w;
3131
private final Pipe.SourceChannel r;
3232
private final Selector selector;
33-
private final ByteBuffer wdummy = ByteBuffer.allocate(1);
34-
private final ByteBuffer rdummy = ByteBuffer.allocate(1);
33+
private final ThreadLocal<ByteBuffer> wdummy = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1));
34+
private final ThreadLocal<ByteBuffer> rdummy = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1));
3535

3636
// Selector.selectNow at every sending message doesn't show enough performance
3737
private final AtomicLong wcursor = new AtomicLong(0);
@@ -68,7 +68,7 @@ private interface IoOperation<O>
6868
private <O> O maksInterrupt(IoOperation<O> operation) throws IOException
6969
{
7070
// This loop try to protect the current thread from external interruption.
71-
// If it happens, it mangle current context internal state.
71+
// If it happens, it mangles current context internal state.
7272
// So it keep trying until it succeed.
7373
// This must only be called on internal IO (using Pipe)
7474
boolean interrupted = Thread.interrupted();
@@ -131,8 +131,8 @@ void send()
131131

132132
while (nbytes == 0) {
133133
try {
134-
wdummy.clear();
135-
nbytes = maksInterrupt(() -> w.write(wdummy));
134+
wdummy.get().clear();
135+
nbytes = maksInterrupt(() -> w.write(wdummy.get()));
136136
}
137137
catch (IOException e) {
138138
throw new ZError.IOException(e);
@@ -198,8 +198,8 @@ void recv()
198198
// On windows, there may be a need to try several times until it succeeds
199199
while (nbytes == 0) {
200200
try {
201-
rdummy.clear();
202-
nbytes = maksInterrupt(() -> r.read(rdummy));
201+
rdummy.get().clear();
202+
nbytes = maksInterrupt(() -> r.read(rdummy.get()));
203203
}
204204
catch (ClosedChannelException e) {
205205
errno.set(ZError.EINTR);

src/main/java/zmq/SocketBase.java

Lines changed: 2 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,6 @@ public String toString()
104104
// Mutex for synchronize access to the socket in thread safe mode
105105
private final ReentrantLock threadSafeSync;
106106

107-
// Signaler to be used in the reaping stage
108-
private Signaler reaperSignaler;
109-
110107
protected SocketBase(Ctx parent, int tid, int sid)
111108
{
112109
this(parent, tid, sid, false);
@@ -135,14 +132,8 @@ protected SocketBase(Ctx parent, int tid, int sid, boolean threadSafe)
135132

136133
this.threadSafe = threadSafe;
137134
this.threadSafeSync = new ReentrantLock();
138-
this.reaperSignaler = null;
139135

140-
if (threadSafe) {
141-
mailbox = new MailboxSafe(parent, this.threadSafeSync, "safe-socket-" + sid);
142-
}
143-
else {
144-
mailbox = new Mailbox(parent, "socket-" + sid, tid);
145-
}
136+
mailbox = new Mailbox(parent, "socket-" + sid, tid);
146137
}
147138

148139
// Concrete algorithms for the x- methods are to be defined by
@@ -170,14 +161,6 @@ protected void destroy()
170161
catch (IOException ignore) {
171162
}
172163

173-
if (reaperSignaler != null) {
174-
try {
175-
reaperSignaler.close();
176-
}
177-
catch (IOException ignored) {
178-
}
179-
}
180-
181164
stopMonitor();
182165
assert (destroyed.get());
183166
}
@@ -1083,11 +1066,6 @@ public final void close()
10831066
lock();
10841067

10851068
try {
1086-
// Remove all existing signalers for thread safe sockets
1087-
if (threadSafe) {
1088-
((MailboxSafe) mailbox).clearSignalers();
1089-
}
1090-
10911069
// Mark the socket as dead
10921070
active = false;
10931071

@@ -1121,23 +1099,7 @@ final void startReaping(Poller poller)
11211099
this.poller = poller;
11221100
SelectableChannel fd;
11231101

1124-
if (!threadSafe) {
1125-
fd = ((Mailbox) mailbox).getFd();
1126-
}
1127-
else {
1128-
threadSafeSync.lock();
1129-
try {
1130-
reaperSignaler = new Signaler(getCtx(), getTid(), getCtx().errno());
1131-
fd = reaperSignaler.getFd();
1132-
((MailboxSafe) mailbox).addSignaler(reaperSignaler);
1133-
1134-
// Send a signal to make sure reaper handle existing commands
1135-
reaperSignaler.send();
1136-
}
1137-
finally {
1138-
threadSafeSync.unlock();
1139-
}
1140-
}
1102+
fd = ((Mailbox) mailbox).getFd();
11411103

11421104
handle = this.poller.addHandle(fd, this);
11431105
this.poller.setPollIn(handle);
@@ -1357,10 +1319,6 @@ public final void inEvent()
13571319
lock();
13581320

13591321
try {
1360-
// If the socket is thread safe we need to unsignal the reaper signaler
1361-
if (threadSafe) {
1362-
reaperSignaler.recv();
1363-
}
13641322
enterInEvent();
13651323
processCommands(0, false, null);
13661324
}

0 commit comments

Comments
 (0)