diff --git a/src/main/java/zmq/Mailbox.java b/src/main/java/zmq/Mailbox.java index 18d1202d..ae43d710 100644 --- a/src/main/java/zmq/Mailbox.java +++ b/src/main/java/zmq/Mailbox.java @@ -2,30 +2,20 @@ import java.io.IOException; import java.nio.channels.SelectableChannel; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Deque; +import java.util.concurrent.ConcurrentLinkedDeque; -import zmq.pipe.YPipe; import zmq.util.Errno; -public final class Mailbox implements IMailbox +public class Mailbox implements IMailbox { // The pipe to store actual commands. - private final YPipe cpipe; + private final Deque cpipe; // Signaler to pass signals from writer thread to reader thread. + // kept it although a ConcurrentLinkedDeque, because the signaler channel is used in many places. private final Signaler signaler; - // There's only one thread receiving from the mailbox, but there - // is arbitrary number of threads sending. Given that ypipe requires - // synchronized access on both of its endpoints, we have to synchronize - // the sending side. - private final Lock sync; - - // True if the underlying pipe is active, i.e. when we are allowed to - // read commands from it. - private boolean active; - // mailbox name, for better debugging private final String name; @@ -34,18 +24,9 @@ public final class Mailbox implements IMailbox public Mailbox(Ctx ctx, String name, int tid) { this.errno = ctx.errno(); - cpipe = new YPipe<>(Config.COMMAND_PIPE_GRANULARITY.getValue()); - sync = new ReentrantLock(); + cpipe = new ConcurrentLinkedDeque<>(); signaler = new Signaler(ctx, tid, errno); - // Get the pipe into passive state. That way, if the users starts by - // polling on the associated file descriptor it will get woken up when - // new command is posted. - - Command cmd = cpipe.read(); - assert (cmd == null); - active = false; - this.name = name; } @@ -57,69 +38,39 @@ public SelectableChannel getFd() @Override public void send(final Command cmd) { - boolean ok = false; - sync.lock(); - try { - cpipe.write(cmd, false); - ok = cpipe.flush(); - } - finally { - sync.unlock(); - } - - if (!ok) { - signaler.send(); - } + cpipe.addLast(cmd); + signaler.send(); } @Override public Command recv(long timeout) { - Command cmd; - // Try to get the command straight away. - if (active) { - cmd = cpipe.read(); - if (cmd != null) { - return cmd; + Command cmd = cpipe.pollFirst(); + while (cmd == null) { + // Wait for signal from the command sender. + boolean rc = signaler.waitEvent(timeout); + if (!rc) { + assert (errno.get() == ZError.EAGAIN || errno.get() == ZError.EINTR) : errno.get(); + break; } - // If there are no more commands available, switch into passive state. - active = false; - } - - // Wait for signal from the command sender. - boolean rc = signaler.waitEvent(timeout); - if (!rc) { - assert (errno.get() == ZError.EAGAIN || errno.get() == ZError.EINTR) : errno.get(); - return null; - } + // Receive the signal. + signaler.recv(); + if (errno.get() == ZError.EINTR) { + break; + } - // Receive the signal. - signaler.recv(); - if (errno.get() == ZError.EINTR) { - return null; + // Get a command. + // Another thread may already fetch the command, so loop on it + cmd = cpipe.pollFirst(); } - // Switch into active state. - active = true; - - // Get a command. - cmd = cpipe.read(); - assert (cmd != null) : "command shall never be null when read"; - return cmd; } @Override public void close() throws IOException { - // TODO: Retrieve and deallocate commands inside the cpipe. - - // Work around problem that other threads might still be in our - // send() method, by waiting on the mutex before disappearing. - sync.lock(); - sync.unlock(); - signaler.close(); } diff --git a/src/main/java/zmq/MailboxSafe.java b/src/main/java/zmq/MailboxSafe.java index e6ecab1b..e1698e75 100644 --- a/src/main/java/zmq/MailboxSafe.java +++ b/src/main/java/zmq/MailboxSafe.java @@ -9,6 +9,7 @@ import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.Condition; +@Deprecated public class MailboxSafe implements IMailbox { // The pipe to store actual commands. diff --git a/src/main/java/zmq/Signaler.java b/src/main/java/zmq/Signaler.java index 55199455..a842554d 100644 --- a/src/main/java/zmq/Signaler.java +++ b/src/main/java/zmq/Signaler.java @@ -30,8 +30,8 @@ private interface IoOperation private final Pipe.SinkChannel w; private final Pipe.SourceChannel r; private final Selector selector; - private final ByteBuffer wdummy = ByteBuffer.allocate(1); - private final ByteBuffer rdummy = ByteBuffer.allocate(1); + private final ThreadLocal wdummy = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1)); + private final ThreadLocal rdummy = ThreadLocal.withInitial(() -> ByteBuffer.allocate(1)); // Selector.selectNow at every sending message doesn't show enough performance private final AtomicLong wcursor = new AtomicLong(0); @@ -68,7 +68,7 @@ private interface IoOperation private O maksInterrupt(IoOperation operation) throws IOException { // This loop try to protect the current thread from external interruption. - // If it happens, it mangle current context internal state. + // If it happens, it mangles current context internal state. // So it keep trying until it succeed. // This must only be called on internal IO (using Pipe) boolean interrupted = Thread.interrupted(); @@ -131,8 +131,8 @@ void send() while (nbytes == 0) { try { - wdummy.clear(); - nbytes = maksInterrupt(() -> w.write(wdummy)); + wdummy.get().clear(); + nbytes = maksInterrupt(() -> w.write(wdummy.get())); } catch (IOException e) { throw new ZError.IOException(e); @@ -198,8 +198,8 @@ void recv() // On windows, there may be a need to try several times until it succeeds while (nbytes == 0) { try { - rdummy.clear(); - nbytes = maksInterrupt(() -> r.read(rdummy)); + rdummy.get().clear(); + nbytes = maksInterrupt(() -> r.read(rdummy.get())); } catch (ClosedChannelException e) { errno.set(ZError.EINTR); diff --git a/src/main/java/zmq/SocketBase.java b/src/main/java/zmq/SocketBase.java index c3d336c8..a96ca861 100644 --- a/src/main/java/zmq/SocketBase.java +++ b/src/main/java/zmq/SocketBase.java @@ -104,9 +104,6 @@ public String toString() // Mutex for synchronize access to the socket in thread safe mode private final ReentrantLock threadSafeSync; - // Signaler to be used in the reaping stage - private Signaler reaperSignaler; - protected SocketBase(Ctx parent, int tid, int sid) { this(parent, tid, sid, false); @@ -135,14 +132,8 @@ protected SocketBase(Ctx parent, int tid, int sid, boolean threadSafe) this.threadSafe = threadSafe; this.threadSafeSync = new ReentrantLock(); - this.reaperSignaler = null; - if (threadSafe) { - mailbox = new MailboxSafe(parent, this.threadSafeSync, "safe-socket-" + sid); - } - else { - mailbox = new Mailbox(parent, "socket-" + sid, tid); - } + mailbox = new Mailbox(parent, "socket-" + sid, tid); } // Concrete algorithms for the x- methods are to be defined by @@ -170,14 +161,6 @@ protected void destroy() catch (IOException ignore) { } - if (reaperSignaler != null) { - try { - reaperSignaler.close(); - } - catch (IOException ignored) { - } - } - stopMonitor(); assert (destroyed.get()); } @@ -1083,11 +1066,6 @@ public final void close() lock(); try { - // Remove all existing signalers for thread safe sockets - if (threadSafe) { - ((MailboxSafe) mailbox).clearSignalers(); - } - // Mark the socket as dead active = false; @@ -1121,23 +1099,7 @@ final void startReaping(Poller poller) this.poller = poller; SelectableChannel fd; - if (!threadSafe) { - fd = ((Mailbox) mailbox).getFd(); - } - else { - threadSafeSync.lock(); - try { - reaperSignaler = new Signaler(getCtx(), getTid(), getCtx().errno()); - fd = reaperSignaler.getFd(); - ((MailboxSafe) mailbox).addSignaler(reaperSignaler); - - // Send a signal to make sure reaper handle existing commands - reaperSignaler.send(); - } - finally { - threadSafeSync.unlock(); - } - } + fd = ((Mailbox) mailbox).getFd(); handle = this.poller.addHandle(fd, this); this.poller.setPollIn(handle); @@ -1357,10 +1319,6 @@ public final void inEvent() lock(); try { - // If the socket is thread safe we need to unsignal the reaper signaler - if (threadSafe) { - reaperSignaler.recv(); - } enterInEvent(); processCommands(0, false, null); }