Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

by default reactor rabbit is not setup well for rabbitmq connection loss #142

Open
a701440 opened this issue Aug 19, 2020 · 8 comments
Open

Comments

@a701440
Copy link

a701440 commented Aug 19, 2020

I noticed that Sender and Receiver components unconditionally cache connections and channels . his makes it harder to write code that is resilient when socket connections to Rabbit are closed and/or go bad.

For example in Sender:

connectionMono.map(CHANNEL_PROXY_CREATION_FUNCTION).transform(this::cache)

Unless the resourceManagementChannelMono is passed with options
the default channel is created and cached. It does not react well in the situations when connections are severed, etc.
Ideally it needs to detect that cached channel is bad and re-subscribe to the connection Mono, etc.

The same situation exists with connection Mono in other places.

@acogoluegnes
Copy link
Contributor

Sorry for the delay, I was away for a few weeks.

Can you be more specific about the scenarios you'd like to cover in case of connection/channel failure?

Possible network problems are handled in different ways in different locations of the code, with (I guess) reasonable defaults. We had long discussions about retries and came to the conclusion that a default behavior that would satisfy everyone or even 90 % of the cases is impossible to find. This is why we introduced some settings to configure it (or something else) on a case-by-case basis, e.g. for connection creation.

The case you mention is by default protected from channel error/closing with the ChannelProxy class and the SenderOptions#resourceManagementChannelMono property allows to change this default behavior.

Please provide samples or even better failing tests to explain that you'd like the library to do for the scenarios you're thinking of.

@a701440
Copy link
Author

a701440 commented Sep 1, 2020

My objective was to have a Flux that can survive RabbitMQ cluster crush/restart (all nodes) and just resume streaming when Rabbit is back up (auto-recover from the RabbitMQ failures).

Basically needed to keep retrying the connection, redeclaring the topology (if it's not durable), etc.

AutoRecoverable connection did not really work well for me in this scenario.
I had to make several changes:

  1. Did not use the AutoRecoverable connection

  2. Created special Mono, Mono that was verifying that connection is open before returning and not just using .cache() and returning an invalid connection. Something along these lines:

    public static Mono<Connection> connectionMono(final ConnectionFactory connectionFactory, String connectionName) {
        AtomicReference<Connection> connectionHolder = new AtomicReference<>();
        Mono<Connection> result = Mono.create(sink -> {
            try {
                Connection conn = connectionHolder.updateAndGet(c -> {
                    try {
                        return c != null && c.isOpen() ? c : connectionFactory.newConnection(connectionName);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                });
                if (conn == null) {
                    sink.error(new RuntimeException("Failed to create RabbitMq connection"));
                }
                sink.success(conn);
            } catch (Exception e) {
                sink.error(e);
            }
        });
        return result;
    }
  1. The Receiver has some condition checks that did not work in this scenario, so had to override that:
    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
        channel.addShutdownListener(reason -> {
           // no check here, this will send complete Flux signal on any Socket problem with the connection
            emitter.complete();
        });
    }
  1. Then I also had to add some completion handling and Retry logic to the Flux. This makes this Flux "endless"
    which you kind of expect from a Rabbit queue.
        // convert Complete signal to EOF error, retry will handle all errors including the EOF.
        flux = flux.flatMap(Mono::just, Mono::error, () -> Mono.error(new EOFException()));

        // when connection to Rabbit is lost retry with new connection
        flux = flux.retryWhen(retryConnection()); // retryConnection has exponential back-off logic, etc.

All together this creates a robust Flux that survives Rabbit failures and auto-recovers the processing.
Some code that is currently there like the unconditional ".cache()" in the connectionMono, makes it harder to create a robust handler.

I recommend trying any changes against a local RabbitMQ node that you can kill and restart, see what actually happens with the message processing.

@acogoluegnes
Copy link
Contributor

My objective was to have a Flux that can survive RabbitMQ cluster crush/restart (all nodes) and just resume streaming when Rabbit is back up (auto-recover from the RabbitMQ failures).

Basically needed to keep retrying the connection, redeclaring the topology (if it's not durable), etc.

AutoRecoverable connection did not really work well for me in this scenario.
I had to make several changes:

  1. Did not use the AutoRecoverable connection

You lose the benefits of topology recovery, how did you handle it?

  1. Created special Mono, Mono that was verifying that connection is open before returning and not just using .cache() and returning an invalid connection. Something along these lines:
    public static Mono connectionMono(final ConnectionFactory connectionFactory, String connectionName) {
    AtomicReference connectionHolder = new AtomicReference<>();
    Mono result = Mono.create(sink -> {
    try {
    Connection conn = connectionHolder.updateAndGet(c -> {
    try {
    return c != null && c.isOpen() ? c : connectionFactory.newConnection(connectionName);
    } catch (Exception e) {
    throw new RuntimeException(e);
    }
    });
    if (conn == null) {
    sink.error(new RuntimeException("Failed to create RabbitMq connection"));
    }
    sink.success(conn);
    } catch (Exception e) {
    sink.error(e);
    }
    });
    return result;
    }

This is pretty neat, thanks for sharing.

  1. The Receiver has some condition checks that did not work in this scenario, so had to override that:
    protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
    channel.addShutdownListener(reason -> {
    // no check here, this will send complete Flux signal on any Socket problem with the connection
    emitter.complete();
    });
    }

Yes, the original condition checks assume automatic connection recovery is enabled. We could check if the connection is a RecoverableConnection.

  1. Then I also had to add some completion handling and Retry logic to the Flux. This makes this Flux "endless"
    which you kind of expect from a Rabbit queue.
     // convert Complete signal to EOF error, retry will handle all errors including the EOF.
     flux = flux.flatMap(Mono::just, Mono::error, () -> Mono.error(new EOFException()));
    
     // when connection to Rabbit is lost retry with new connection
     flux = flux.retryWhen(retryConnection()); // retryConnection has exponential back-off logic, etc.
    

Again, retry is very application-specific, I think the documentation covers a reasonable way to deal with connection retry.

All together this creates a robust Flux that survives Rabbit failures and auto-recovers the processing.
Some code that is currently there like the unconditional ".cache()" in the connectionMono, makes it harder to create a robust handler.

I see your point, I'd say the default behavior provides a reasonably robust mechanism, and the possibility to provide the Mono<Connection> makes it possible to deal with more specific cases.

I recommend trying any changes against a local RabbitMQ node that you can kill and restart, see what actually happens with the message processing.

The test suite has some recovery tests, but it only closes connections, so a node restart could trigger some slightly different behavior. Feel free to experiment with tests for now (the test suite needs a system property to know how to call rabbitmqctl) and let us know what goes wrong.

@a701440
Copy link
Author

a701440 commented Sep 2, 2020

The topology declaration (re-declaration) was part of the chain.

Flux flux = declareTopology(sender).flatMapMany(ok -> receiver.consumeManualAck(queueName, consumeOptions)).flatMap(Mono::just, Mono::error, () -> Mono.error(new EOFException())).retryWhen(retryConnection())...

So basically declaration is part of the retry.

@a701440
Copy link
Author

a701440 commented Sep 2, 2020

So may be at least do not cache() permanently non-recoverable connections and channels. And I think that Receiver also assumes the auto recoverable connection and does not work well when RabbitMQ node is killed:

protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
    channel.addShutdownListener(reason -> {
        if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
            emitter.complete();
        }
    });
}

@acogoluegnes
Copy link
Contributor

I created a follow-up issue #143. I hope to be able to work on it in the new few weeks or feel free to submit a PR.

@acogoluegnes
Copy link
Contributor

acogoluegnes commented Sep 8, 2020

And I think that Receiver also assumes the auto recoverable connection and does not work well when RabbitMQ node is killed:

protected void completeOnChannelShutdown(Channel channel, FluxSink<?> emitter) {
    channel.addShutdownListener(reason -> {
        if (!AutorecoveringConnection.DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION.test(reason)) {
            emitter.complete();
        }
    });
}

@a701440 I pushed a change for this one.

Not caching non-recoverable connections and channels is not possible with Mono#cache. There's an open issue in Reactor Core that could help though: reactor/reactor-core#2324 (I guess you knew, you opened it).

@philsttr
Copy link

philsttr commented Mar 22, 2021

Just want to +1 here to say that I encountered the same problem.

I have implemented a solution that builds upon the approach from @a701440. Specifically, it supports a callback to explicitly close a connection (rather than relying just on isOpen() to determine if a new connection should be created). In my case, I was seeing where isOpen() was still returning true even when the connection was having problems. The retry/repeat logic on the Flux invokes the callback to close the existing connection before attempting to resubscribe.

    /**
     * Reference to the current active rabbit {@link Connection}.
     */
    private static class ConnectionReference {
        private final AtomicReference<Connection> connectionRef = new AtomicReference<>();

        private final ConnectionFactory connectionFactory;
        private final String connectionName;
        private final Duration closeTimeout;

        private ConnectionReference(ConnectionFactory connectionFactory, String connectionName, Duration closeTimeout) {
            this.connectionFactory = Objects.requireNonNull(connectionFactory, "connectionFactory must not be null")
            this.connectionName = Objects.requireNonNull(connectionName, "connectionName must not be null");
            this.closeTimeout = Objects.requireNonNull(closeTimeout, "closeTimeout must not be null");
        }

        /**
         * Returns the current active rabbit {@link Connection}.
         *
         * <p>If no connection is currently active, then creates and
         * returns a new {@link Connection}, and sets it as the active connection.</p>
         *
         * @return the current active rabbit {@link Connection}.
         * @throws Exception if there was a problem retrieving/creating a connection
         */
        public Connection get() throws Exception {
            try {
                return connectionRef.updateAndGet(existingConnection -> {
                    try {
                        if (existingConnection != null) {
                            if (existingConnection.isOpen()) {
                                return existingConnection;
                            }
                            closeConnection(existingConnection);
                        }
                        return connectionFactory.newConnection(connectionName);
                    } catch (Exception e) {
                        throw Exceptions.propagate(e);
                    }
                });
            } catch (RuntimeException e) {
                throw (Exception) Exceptions.unwrap(e);
            }
        }

        /**
         * Closes the current active rabbit {@link Connection} (if any).
         *
         * <p>Causes the next call to {@link #get()} to create a new rabbit {@link Connection}.</p>
         */
        public void close() {
            Connection oldConnection = connectionRef.getAndSet(null);
            if (oldConnection != null) {
                closeConnection(oldConnection);
            }
        }

        private void closeConnection(Connection connection) {
            try {
                connection.close((int) closeTimeout.toMillis());
            } catch (ShutdownSignalException e) {
                // ignore
            } catch (IOException e) {
                LOGGER.warn("Exception occurred while closing amqp connection: {}", connection, e);
            }
        }
    }

My connectionMono looks like this:

Mono.fromCallable(connectionRef::get)
     .subscribeOn(connectionSubscriptionScheduler)

And my repeat/retry logic looks like this:

    // ... snip ....   Upstream declares queues, and sends/receives messages, via Sender/Receiver.

    /*
     * Retry if there was a failure.
     * This can occur if rabbit is not available on startup,
     * or if the connection breaks.
     */
    .retryWhen(Retry
            .backoff(Integer.MAX_VALUE, Duration.ofSeconds(1))
            .maxBackoff(Duration.ofSeconds(30))
            .doBeforeRetry(retrySignal -> {
                LOGGER.warn(
                        "Restarting after exception. Retry# {}",
                        retrySignal.totalRetriesInARow() + 1,
                        retrySignal.failure());
                connectionRef.close();
            }))
    /*
     * If the upstream completes, but we have not stopped yet, then resubscribe.
     * The resubscribe will cause the queue to be recreated since the queues are declared upstream.
     *
     * The upstream will complete if the queue is deleted while this stream is consuming from it.
     */
    .repeatWhen(Repeat.onlyIf(context -> state != State.STOPPED)
            .doOnRepeat(context -> {
                LOGGER.warn(
                        "Restarting after completion. Restart# {}",
                        context.iteration());
                connectionRef.close();
            }))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants