Skip to content

Commit

Permalink
OF-2661: Ensure we send everything (flush) before actual channel clos…
Browse files Browse the repository at this point in the history
…ure (igniterealtime#2325)

* OF-2661: Ensure we send everything (flush) before actual channel closure

* OF-2661: Restructure listeners for better sequence guarantees

* OF-2661: Additional restructuring of closing of Netty connection.

* OF-2661: Suspecting a deadlock-like state

---------

Co-authored-by: Guus der Kinderen <[email protected]>
  • Loading branch information
Fishbowler and guusdk committed Nov 10, 2023
1 parent 67f7173 commit 4c14c5a
Showing 1 changed file with 24 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@

package org.jivesoftware.openfire.nio;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.compression.JZlibDecoder;
Expand Down Expand Up @@ -76,7 +75,7 @@ public class NettyConnection extends AbstractConnection

/**
* Flag that specifies if the connection should be considered closed. Closing a NIO connection
* is an asynch operation so instead of waiting for the connection to be actually closed just
* is an asynchronous operation so instead of waiting for the connection to be actually closed just
* keep this flag to avoid using the connection between #close was used and the socket is actually
* closed.
*/
Expand Down Expand Up @@ -193,9 +192,9 @@ public void close() {
@Override
public void close(@Nullable final StreamError error) {
if (state.compareAndSet(State.OPEN, State.CLOSED)) {
Log.trace("Closing {} with optional error: {}", this, error);

// Ensure that the state of this connection, its session and the Netty Channel are eventually closed.

if (session != null) {
session.setStatus(Session.Status.CLOSED);
}
Expand All @@ -207,20 +206,19 @@ public void close(@Nullable final StreamError error) {
rawEndStream += "</stream:stream>";

try {
ChannelFuture f = channelHandlerContext.writeAndFlush(rawEndStream);
updateWrittenBytesCounter(channelHandlerContext);
} catch (Exception e) {
Log.error("Failed to deliver stream close tag: " + e.getMessage());
}

try {
// TODO don't block, handle errors async with custom ChannelFutureListener
this.channelHandlerContext.channel().close().addListener(ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE).sync();
channelHandlerContext.writeAndFlush(rawEndStream)
.addListener(e -> Log.trace("Written end-of-stream, closing connection."))
.addListener(ChannelFutureListener.CLOSE)
.addListener(e -> {
Log.trace("Notifying close listeners.");
notifyCloseListeners();
closeListeners.clear();
})
.addListener(e -> Log.trace("Finished closing connection."))
.sync();
} catch (Exception e) {
Log.error("Exception while closing Netty session", e);
Log.error("Failed to deliver stream close tag or to close the connection", e);
}
notifyCloseListeners(); // clean up session, etc.
closeListeners.clear();
}
}

Expand Down Expand Up @@ -273,8 +271,10 @@ public void deliver(Packet packet) throws UnauthorizedException {
else {
boolean errorDelivering = false;
try {
ChannelFuture f = channelHandlerContext.writeAndFlush(packet.getElement().asXML());
updateWrittenBytesCounter(channelHandlerContext);
channelHandlerContext.writeAndFlush(packet.getElement().asXML())
.addListener(l ->
updateWrittenBytesCounter(channelHandlerContext)
);
// TODO - handle errors more specifically
// Currently errors are handled by the default exceptionCaught method (log error, close channel)
// We can add a new listener to the ChannelFuture f for more specific error handling.
Expand Down Expand Up @@ -302,9 +302,10 @@ public void deliver(Packet packet) throws UnauthorizedException {
@Override
public void deliverRawText(String text) {
if (!isClosed()) {
Log.trace("Sending: " + text);
ChannelFuture f = channelHandlerContext.writeAndFlush(text);
updateWrittenBytesCounter(channelHandlerContext);
Log.trace("Sending: {}", text);
channelHandlerContext.writeAndFlush(text).addListener(l ->
updateWrittenBytesCounter(channelHandlerContext)
);
// TODO - handle errors more specifically
// Currently errors are handled by the default exceptionCaught method (log error, close channel)
// We can add a new listener to the ChannelFuture f for more specific error handling.
Expand All @@ -314,8 +315,7 @@ public void deliverRawText(String text) {
}

/**
* Updates the system counter of written bytes. This information is used by the outgoing
* bytes statistic.
* Updates the system counter of written bytes. This information is used by the "outgoing bytes" statistic.
*
* @param ctx the context for the channel writing bytes
*/
Expand All @@ -340,8 +340,8 @@ public void startTLS(boolean clientMode, boolean directTLS) throws Exception {
final EncryptionArtifactFactory factory = new EncryptionArtifactFactory( configuration );

final SslContext sslContext;
if ( clientMode ) {
sslContext= factory.createClientModeSslContext();
if (clientMode) {
sslContext = factory.createClientModeSslContext();
} else {
sslContext = factory.createServerModeSslContext(directTLS);
}
Expand Down

0 comments on commit 4c14c5a

Please sign in to comment.