From bfe67323aa6ea1b8dc2bc2164f1911d7e08b6799 Mon Sep 17 00:00:00 2001 From: TungTV Date: Thu, 16 Jan 2025 16:14:24 +0700 Subject: [PATCH 1/4] ISSUE-5334 Disconnection: the distributed way --- .../tmail/james/app/DistributedServer.java | 2 + tmail-backend/data/data-rabbitmq/pom.xml | 56 +++++++++++ .../tmail/DisconnectorRequestSerializer.java | 82 ++++++++++++++++ .../tmail/RabbitMQDisconnectorConsumer.java | 98 +++++++++++++++++++ .../tmail/RabbitMQDisconnectorNotifier.java | 64 ++++++++++++ .../tmail/RabbitMQDisconnectorOperator.java | 95 ++++++++++++++++++ tmail-backend/guice/distributed/pom.xml | 4 + .../tmail/RabbitMQDisconnectorModule.java | 41 ++++++++ ...tributedProtocolServerIntegrationTest.java | 12 ++- .../tmail/integration/TestFixture.java | 32 ++++++ tmail-backend/pom.xml | 6 ++ 11 files changed, 489 insertions(+), 3 deletions(-) create mode 100644 tmail-backend/data/data-rabbitmq/pom.xml create mode 100644 tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java create mode 100644 tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java create mode 100644 tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java create mode 100644 tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java create mode 100644 tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/RabbitMQDisconnectorModule.java create mode 100644 tmail-backend/integration-tests/webadmin/webadmin-integration-tests-common/src/main/java/com/linagora/tmail/integration/TestFixture.java diff --git a/tmail-backend/apps/distributed/src/main/java/com/linagora/tmail/james/app/DistributedServer.java b/tmail-backend/apps/distributed/src/main/java/com/linagora/tmail/james/app/DistributedServer.java index aa83c76f39..369074a28b 100644 --- a/tmail-backend/apps/distributed/src/main/java/com/linagora/tmail/james/app/DistributedServer.java +++ b/tmail-backend/apps/distributed/src/main/java/com/linagora/tmail/james/app/DistributedServer.java @@ -144,6 +144,7 @@ import com.linagora.tmail.OpenPaasContactsConsumerModule; import com.linagora.tmail.OpenPaasModule; import com.linagora.tmail.OpenPaasModuleChooserConfiguration; +import com.linagora.tmail.RabbitMQDisconnectorModule; import com.linagora.tmail.ScheduledReconnectionHandler; import com.linagora.tmail.UsersRepositoryModuleChooser; import com.linagora.tmail.blob.guice.BlobStoreCacheModulesChooser; @@ -338,6 +339,7 @@ protected void configure() { new DistributedEmailAddressContactEventDeadLettersModule(), new DistributedTaskSerializationModule(), new JMAPEventBusModule(), + new RabbitMQDisconnectorModule(), new RabbitMQEmailAddressContactModule(), new RabbitMQEventBusModule(), new RabbitMQModule(), diff --git a/tmail-backend/data/data-rabbitmq/pom.xml b/tmail-backend/data/data-rabbitmq/pom.xml new file mode 100644 index 0000000000..ebe15b52ab --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/pom.xml @@ -0,0 +1,56 @@ + + + 4.0.0 + + com.linagora.tmail + tmail-backend + 1.0.0-SNAPSHOT + ../../pom.xml + + + data-rabbitmq + Twake Mail :: Data :: RabbitMQ + + + + ${james.groupId} + apache-james-backends-rabbitmq + + + ${james.groupId} + apache-james-backends-rabbitmq + test-jar + test + + + ${james.groupId} + james-server-data-api + + + ${james.groupId} + james-server-testing + test + + + ${james.groupId} + metrics-tests + test + + + ${james.groupId} + testing-base + test + + + com.rabbitmq + amqp-client + + + org.mockito + mockito-core + test + + + \ No newline at end of file diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java new file mode 100644 index 0000000000..c58e5668ce --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java @@ -0,0 +1,82 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import java.nio.charset.StandardCharsets; +import java.util.stream.Collectors; + +import org.apache.commons.lang3.StringUtils; +import org.apache.james.DisconnectorNotifier.AllUsersRequest; +import org.apache.james.DisconnectorNotifier.MultipleUserRequest; +import org.apache.james.DisconnectorNotifier.Request; +import org.apache.james.core.Username; + +import com.google.common.base.Splitter; + +public class DisconnectorRequestSerializer { + + public static class DisconnectorRequestSerializeException extends RuntimeException { + public DisconnectorRequestSerializeException(String message) { + super(message); + } + + public DisconnectorRequestSerializeException(String message, Throwable cause) { + super(message, cause); + } + } + + public static final String ALL_USERS_REQUEST = "[]"; + public static final String USER_DELIMITER = ","; + + public static String serialize(Request request) { + return switch (request) { + case MultipleUserRequest multipleUserRequest -> multipleUserRequest.usernameList().stream() + .map(Username::asString) + .collect(Collectors.joining(USER_DELIMITER, "[", "]")); + case AllUsersRequest allUsersRequest -> ALL_USERS_REQUEST; + default -> throw new DisconnectorRequestSerializeException("Unknown request type: " + request); + }; + } + + public static byte[] serializeAsBytes(Request request) { + return serialize(request).getBytes(StandardCharsets.UTF_8); + } + + public static Request deserialize(String serialized) { + if (ALL_USERS_REQUEST.equals(serialized)) { + return AllUsersRequest.ALL_USERS_REQUEST; + } + if (StringUtils.startsWith(serialized, "[") && StringUtils.endsWith(serialized, "]")) { + try { + return MultipleUserRequest.of(Splitter.on(",") + .omitEmptyStrings() + .splitToStream(serialized.substring(1, serialized.length() - 1)) + .map(Username::of) + .collect(Collectors.toSet())); + } catch (Exception e) { + throw new DisconnectorRequestSerializeException("Error while deserializing: " + serialized, e); + } + } + throw new DisconnectorRequestSerializeException("Unknown serialized format: " + serialized); + } + + public static Request deserialize(byte[] serialized) { + return deserialize(new String(serialized, StandardCharsets.UTF_8)); + } +} diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java new file mode 100644 index 0000000000..bbd874748a --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java @@ -0,0 +1,98 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import java.io.Closeable; +import java.net.InetAddress; +import java.util.Optional; +import java.util.UUID; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.apache.james.DisconnectorNotifier.InVMDisconnectorNotifier; +import org.apache.james.backends.rabbitmq.ReceiverProvider; +import org.apache.james.lifecycle.api.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.github.fge.lambdas.Throwing; + +import reactor.core.Disposable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.AcknowledgableDelivery; +import reactor.rabbitmq.Receiver; + +public class RabbitMQDisconnectorConsumer implements Startable, Closeable { + + public static final String TMAIL_DISCONNECTOR_QUEUE_NAME = "tmail-disconnector-" + + Throwing.supplier(() -> InetAddress.getLocalHost().getHostName()).get() + + "-" + UUID.randomUUID(); + + private static final boolean REQUEUE_ON_NACK = true; + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorConsumer.class); + + private final ReceiverProvider receiverProvider; + private final InVMDisconnectorNotifier inVMDisconnectorNotifier; + + private Disposable consumeMessages; + + @Inject + @Singleton + public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider, + InVMDisconnectorNotifier inVMDisconnectorNotifier) { + this.receiverProvider = receiverProvider; + this.inVMDisconnectorNotifier = inVMDisconnectorNotifier; + } + + public void start() { + consumeMessages = doConsumeMessages(); + } + + public void restart() { + Disposable previousConsumer = consumeMessages; + consumeMessages = doConsumeMessages(); + Optional.ofNullable(previousConsumer).ifPresent(Disposable::dispose); + } + + private Disposable doConsumeMessages() { + return Flux.using(receiverProvider::createReceiver, + receiver -> receiver.consumeManualAck(TMAIL_DISCONNECTOR_QUEUE_NAME), + Receiver::close) + .flatMap(this::consumeMessage) + .subscribe(); + } + + private Mono consumeMessage(AcknowledgableDelivery ackDelivery) { + return Mono.fromCallable(() -> DisconnectorRequestSerializer.deserialize(ackDelivery.getBody())) + .flatMap(disconnectorRequest -> Mono.fromRunnable(() -> inVMDisconnectorNotifier.disconnect(disconnectorRequest)).then()) + .doOnSuccess(result -> ackDelivery.ack()) + .onErrorResume(error -> { + LOGGER.error("Error when consume message", error); + ackDelivery.nack(!REQUEUE_ON_NACK); + return Mono.empty(); + }); + } + + @Override + public void close() { + Optional.ofNullable(consumeMessages).ifPresent(Disposable::dispose); + } +} diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java new file mode 100644 index 0000000000..d36c5a6126 --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java @@ -0,0 +1,64 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import java.time.Duration; + +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + +import org.apache.commons.lang3.StringUtils; +import org.apache.james.DisconnectorNotifier; +import org.apache.james.lifecycle.api.Startable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import reactor.core.publisher.Mono; +import reactor.rabbitmq.OutboundMessage; +import reactor.rabbitmq.Sender; +import reactor.util.retry.Retry; + +public class RabbitMQDisconnectorNotifier implements DisconnectorNotifier, Startable { + public static final String TMAIL_DISCONNECTOR_EXCHANGE_NAME = "tmail-disconnector"; + public static final String ROUTING_KEY = StringUtils.EMPTY; + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorNotifier.class); + private static final Retry RETRY_SPEC = Retry.backoff(2, Duration.ofMillis(100)); + + private final Sender sender; + + @Inject + @Singleton + public RabbitMQDisconnectorNotifier(Sender sender) { + this.sender = sender; + } + + @Override + public void disconnect(Request request) { + try { + sender.send(Mono.just(new OutboundMessage(TMAIL_DISCONNECTOR_EXCHANGE_NAME, + ROUTING_KEY, + DisconnectorRequestSerializer.serializeAsBytes(request)))) + .retryWhen(RETRY_SPEC) + .block(); + } catch (Exception exception) { + LOGGER.error("Error while sending disconnection request", exception); + } + } +} diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java new file mode 100644 index 0000000000..aac394efff --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java @@ -0,0 +1,95 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import static com.linagora.tmail.RabbitMQDisconnectorConsumer.TMAIL_DISCONNECTOR_QUEUE_NAME; +import static com.linagora.tmail.RabbitMQDisconnectorNotifier.TMAIL_DISCONNECTOR_EXCHANGE_NAME; +import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; +import static org.apache.james.backends.rabbitmq.Constants.DURABLE; +import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; + +import jakarta.inject.Inject; + +import org.apache.james.backends.rabbitmq.QueueArguments; +import org.apache.james.backends.rabbitmq.RabbitMQConfiguration; +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; +import org.apache.james.lifecycle.api.Startable; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Connection; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.ExchangeSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Sender; + +public class RabbitMQDisconnectorOperator implements Startable, SimpleConnectionPool.ReconnectionHandler { + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorOperator.class); + private final Sender sender; + private final RabbitMQConfiguration rabbitMQConfiguration; + private final RabbitMQDisconnectorConsumer disconnectorConsumer; + + @Inject + public RabbitMQDisconnectorOperator(Sender sender, + RabbitMQConfiguration rabbitMQConfiguration, + RabbitMQDisconnectorConsumer disconnectorConsumer) { + this.sender = sender; + this.rabbitMQConfiguration = rabbitMQConfiguration; + this.disconnectorConsumer = disconnectorConsumer; + } + + public void init() { + // Declare the exchange and queue + Flux.concat(sender.declareExchange(ExchangeSpecification.exchange(TMAIL_DISCONNECTOR_EXCHANGE_NAME) + .type(BuiltinExchangeType.FANOUT.getType()) + .durable(DURABLE)), + sender.declareQueue(QueueSpecification + .queue(TMAIL_DISCONNECTOR_QUEUE_NAME) + .durable(DURABLE) + .arguments(queueArgumentSupplier().build())), + sender.bind(BindingSpecification.binding() + .exchange(TMAIL_DISCONNECTOR_EXCHANGE_NAME) + .queue(TMAIL_DISCONNECTOR_QUEUE_NAME) + .routingKey(RabbitMQDisconnectorNotifier.ROUTING_KEY))) + .then().block(); + + // Start the consumer + disconnectorConsumer.start(); + } + + private QueueArguments.Builder queueArgumentSupplier() { + QueueArguments.Builder queueArgumentBuilder = QueueArguments.builder().quorumQueue() + .replicationFactor(rabbitMQConfiguration.getQuorumQueueReplicationFactor()); + rabbitMQConfiguration.getQuorumQueueDeliveryLimit().ifPresent(queueArgumentBuilder::deliveryLimit); + rabbitMQConfiguration.getQueueTTL().ifPresent(queueArgumentBuilder::queueTTL); + return queueArgumentBuilder; + } + + @Override + public Publisher handleReconnection(Connection connection) { + return Mono.fromRunnable(disconnectorConsumer::restart) + .doOnError(error -> LOGGER.error("Error while handle reconnection for disconnector consumer", error)) + .then(); + } +} diff --git a/tmail-backend/guice/distributed/pom.xml b/tmail-backend/guice/distributed/pom.xml index c98c9e401d..9485b9014c 100644 --- a/tmail-backend/guice/distributed/pom.xml +++ b/tmail-backend/guice/distributed/pom.xml @@ -32,6 +32,10 @@ Twake Mail :: Guice :: Server Distributed + + ${project.groupId} + data-rabbitmq + ${project.groupId} jmap-extensions diff --git a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/RabbitMQDisconnectorModule.java b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/RabbitMQDisconnectorModule.java new file mode 100644 index 0000000000..ac4239e215 --- /dev/null +++ b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/RabbitMQDisconnectorModule.java @@ -0,0 +1,41 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import org.apache.james.DisconnectorNotifier; +import org.apache.james.utils.InitializationOperation; +import org.apache.james.utils.InitilizationOperationBuilder; + +import com.google.inject.AbstractModule; +import com.google.inject.multibindings.ProvidesIntoSet; + +public class RabbitMQDisconnectorModule extends AbstractModule { + + @Override + protected void configure() { + bind(DisconnectorNotifier.class).to(RabbitMQDisconnectorNotifier.class); + } + + @ProvidesIntoSet + InitializationOperation init(RabbitMQDisconnectorOperator operator) { + return InitilizationOperationBuilder + .forClass(RabbitMQDisconnectorOperator.class) + .init(operator::init); + } +} diff --git a/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java b/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java index 080ff587e8..fabff6562d 100644 --- a/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java +++ b/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java @@ -18,6 +18,7 @@ package com.linagora.tmail.integration.distributed; +import static com.linagora.tmail.integration.TestFixture.CALMLY_AWAIT; import static io.restassured.RestAssured.when; import static org.assertj.core.api.AssertionsForClassTypes.assertThat; import static org.mockito.Mockito.mock; @@ -25,6 +26,7 @@ import java.util.List; import java.util.Optional; +import java.util.concurrent.TimeUnit; import org.apache.james.GuiceJamesServer; import org.apache.james.JamesServerBuilder; @@ -37,6 +39,7 @@ import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.WebAdminGuiceProbe; import org.apache.james.webadmin.WebAdminUtils; +import org.awaitility.Durations; import org.eclipse.jetty.http.HttpStatus; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -124,12 +127,13 @@ void disconnectShouldRevokeUserFirebaseSubscription(GuiceJamesServer server) { .statusCode(HttpStatus.NO_CONTENT_204); // Then the subscription should be revoked - assertThat(firebaseSubscriptionProbe.retrieveSubscription(BOB, subscription.id())) - .isNull(); + CALMLY_AWAIT.atMost(Durations.TEN_SECONDS) + .untilAsserted(() -> assertThat(firebaseSubscriptionProbe.retrieveSubscription(BOB, subscription.id())) + .isNull()); } @Test - void disconnectShouldNotRevokeUserFirebaseSubscriptionOfUnPredicateUser(GuiceJamesServer server) { + void disconnectShouldNotRevokeUserFirebaseSubscriptionOfUnPredicateUser(GuiceJamesServer server) throws InterruptedException { // Given a subscription for BOB FirebaseSubscriptionProbe firebaseSubscriptionProbe = server.getProbe(FirebaseSubscriptionProbe.class); FirebaseSubscription subscription = firebaseSubscriptionProbe.createSubscription(BOB, createFirebaseSubscriptionCreationRequest()); @@ -144,6 +148,8 @@ void disconnectShouldNotRevokeUserFirebaseSubscriptionOfUnPredicateUser(GuiceJam .statusCode(HttpStatus.NO_CONTENT_204); // Then the subscription of BOB should still be there + Thread.sleep(500); + assertThat(firebaseSubscriptionProbe.retrieveSubscription(BOB, subscription.id())) .isNotNull(); } diff --git a/tmail-backend/integration-tests/webadmin/webadmin-integration-tests-common/src/main/java/com/linagora/tmail/integration/TestFixture.java b/tmail-backend/integration-tests/webadmin/webadmin-integration-tests-common/src/main/java/com/linagora/tmail/integration/TestFixture.java new file mode 100644 index 0000000000..81fe6c86ae --- /dev/null +++ b/tmail-backend/integration-tests/webadmin/webadmin-integration-tests-common/src/main/java/com/linagora/tmail/integration/TestFixture.java @@ -0,0 +1,32 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail.integration; + +import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; + +import org.awaitility.Awaitility; +import org.awaitility.core.ConditionFactory; + +public class TestFixture { + + public static final ConditionFactory CALMLY_AWAIT = Awaitility + .with().pollInterval(ONE_HUNDRED_MILLISECONDS) + .and().pollDelay(ONE_HUNDRED_MILLISECONDS) + .await(); +} diff --git a/tmail-backend/pom.xml b/tmail-backend/pom.xml index 2e799a809b..9ade02e910 100644 --- a/tmail-backend/pom.xml +++ b/tmail-backend/pom.xml @@ -85,6 +85,7 @@ event-bus-redis tmail-third-party plugins + data/data-rabbitmq @@ -129,6 +130,11 @@ test-jar ${project.version} + + ${project.groupId} + data-rabbitmq + ${project.version} + ${project.groupId} distributed From cac761f79c727cbd1b211e7d6c0507d41e722f71 Mon Sep 17 00:00:00 2001 From: TungTV Date: Fri, 17 Jan 2025 14:57:09 +0700 Subject: [PATCH 2/4] fixup! ISSUE-5334 Disconnection: the distributed way --- tmail-backend/data/data-rabbitmq/pom.xml | 5 + .../tmail/DisconnectorRequestSerializer.java | 67 ++++++------ .../tmail/RabbitMQDisconnectorConsumer.java | 7 +- .../tmail/RabbitMQDisconnectorNotifier.java | 7 +- .../tmail/RabbitMQDisconnectorOperator.java | 2 - .../DisconnectorRequestSerializerTest.java | 91 ++++++++++++++++ .../RabbitMQDisconnectorConsumerTest.java | 102 ++++++++++++++++++ ...tributedProtocolServerIntegrationTest.java | 88 ++++++++++++++- tmail-backend/pom.xml | 3 +- 9 files changed, 329 insertions(+), 43 deletions(-) create mode 100644 tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/DisconnectorRequestSerializerTest.java create mode 100644 tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/RabbitMQDisconnectorConsumerTest.java diff --git a/tmail-backend/data/data-rabbitmq/pom.xml b/tmail-backend/data/data-rabbitmq/pom.xml index ebe15b52ab..ba79c1746f 100644 --- a/tmail-backend/data/data-rabbitmq/pom.xml +++ b/tmail-backend/data/data-rabbitmq/pom.xml @@ -52,5 +52,10 @@ mockito-core test + + net.javacrumbs.json-unit + json-unit-assertj + test + \ No newline at end of file diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java index c58e5668ce..f46ee05b92 100644 --- a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/DisconnectorRequestSerializer.java @@ -19,22 +19,25 @@ package com.linagora.tmail; import java.nio.charset.StandardCharsets; +import java.util.List; +import java.util.Set; import java.util.stream.Collectors; -import org.apache.commons.lang3.StringUtils; +import jakarta.inject.Inject; +import jakarta.inject.Singleton; + import org.apache.james.DisconnectorNotifier.AllUsersRequest; import org.apache.james.DisconnectorNotifier.MultipleUserRequest; import org.apache.james.DisconnectorNotifier.Request; import org.apache.james.core.Username; -import com.google.common.base.Splitter; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; public class DisconnectorRequestSerializer { public static class DisconnectorRequestSerializeException extends RuntimeException { - public DisconnectorRequestSerializeException(String message) { - super(message); - } public DisconnectorRequestSerializeException(String message, Throwable cause) { super(message, cause); @@ -42,41 +45,39 @@ public DisconnectorRequestSerializeException(String message, Throwable cause) { } public static final String ALL_USERS_REQUEST = "[]"; - public static final String USER_DELIMITER = ","; + public static final byte[] ALL_USERS_REQUEST_BYTES = ALL_USERS_REQUEST.getBytes(StandardCharsets.UTF_8); + public static final TypeReference> LIST_OF_STRING = new TypeReference<>() { + }; - public static String serialize(Request request) { - return switch (request) { - case MultipleUserRequest multipleUserRequest -> multipleUserRequest.usernameList().stream() - .map(Username::asString) - .collect(Collectors.joining(USER_DELIMITER, "[", "]")); - case AllUsersRequest allUsersRequest -> ALL_USERS_REQUEST; - default -> throw new DisconnectorRequestSerializeException("Unknown request type: " + request); - }; + private final ObjectMapper objectMapper; + + @Inject + @Singleton + public DisconnectorRequestSerializer() { + this.objectMapper = new ObjectMapper(); } - public static byte[] serializeAsBytes(Request request) { - return serialize(request).getBytes(StandardCharsets.UTF_8); + public byte[] serialize(Request request) throws JsonProcessingException { + return switch (request) { + case MultipleUserRequest multipleUserRequest -> objectMapper.writeValueAsBytes( + multipleUserRequest.usernameList().stream() + .map(Username::asString) + .toList()); + case AllUsersRequest allUsersRequest -> ALL_USERS_REQUEST_BYTES; + }; } - public static Request deserialize(String serialized) { - if (ALL_USERS_REQUEST.equals(serialized)) { + public Request deserialize(byte[] serialized) { + if (serialized.length == 2 && serialized[0] == '[' && serialized[1] == ']') { return AllUsersRequest.ALL_USERS_REQUEST; } - if (StringUtils.startsWith(serialized, "[") && StringUtils.endsWith(serialized, "]")) { - try { - return MultipleUserRequest.of(Splitter.on(",") - .omitEmptyStrings() - .splitToStream(serialized.substring(1, serialized.length() - 1)) - .map(Username::of) - .collect(Collectors.toSet())); - } catch (Exception e) { - throw new DisconnectorRequestSerializeException("Error while deserializing: " + serialized, e); - } + try { + Set usernameSet = objectMapper.readValue(serialized, LIST_OF_STRING) + .stream().map(Username::of) + .collect(Collectors.toSet()); + return new MultipleUserRequest(usernameSet); + } catch (Exception e) { + throw new DisconnectorRequestSerializeException("Error while deserializing: " + new String(serialized, StandardCharsets.UTF_8), e); } - throw new DisconnectorRequestSerializeException("Unknown serialized format: " + serialized); - } - - public static Request deserialize(byte[] serialized) { - return deserialize(new String(serialized, StandardCharsets.UTF_8)); } } diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java index bbd874748a..36d865dcc3 100644 --- a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java @@ -51,15 +51,18 @@ public class RabbitMQDisconnectorConsumer implements Startable, Closeable { private final ReceiverProvider receiverProvider; private final InVMDisconnectorNotifier inVMDisconnectorNotifier; + private final DisconnectorRequestSerializer deserializer; private Disposable consumeMessages; @Inject @Singleton public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider, - InVMDisconnectorNotifier inVMDisconnectorNotifier) { + InVMDisconnectorNotifier inVMDisconnectorNotifier, + DisconnectorRequestSerializer deserializer) { this.receiverProvider = receiverProvider; this.inVMDisconnectorNotifier = inVMDisconnectorNotifier; + this.deserializer = deserializer; } public void start() { @@ -81,7 +84,7 @@ private Disposable doConsumeMessages() { } private Mono consumeMessage(AcknowledgableDelivery ackDelivery) { - return Mono.fromCallable(() -> DisconnectorRequestSerializer.deserialize(ackDelivery.getBody())) + return Mono.fromCallable(() -> deserializer.deserialize(ackDelivery.getBody())) .flatMap(disconnectorRequest -> Mono.fromRunnable(() -> inVMDisconnectorNotifier.disconnect(disconnectorRequest)).then()) .doOnSuccess(result -> ackDelivery.ack()) .onErrorResume(error -> { diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java index d36c5a6126..09a4c6e745 100644 --- a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorNotifier.java @@ -42,11 +42,14 @@ public class RabbitMQDisconnectorNotifier implements DisconnectorNotifier, Start private static final Retry RETRY_SPEC = Retry.backoff(2, Duration.ofMillis(100)); private final Sender sender; + private final DisconnectorRequestSerializer serializer; @Inject @Singleton - public RabbitMQDisconnectorNotifier(Sender sender) { + public RabbitMQDisconnectorNotifier(Sender sender, + DisconnectorRequestSerializer serializer) { this.sender = sender; + this.serializer = serializer; } @Override @@ -54,7 +57,7 @@ public void disconnect(Request request) { try { sender.send(Mono.just(new OutboundMessage(TMAIL_DISCONNECTOR_EXCHANGE_NAME, ROUTING_KEY, - DisconnectorRequestSerializer.serializeAsBytes(request)))) + serializer.serialize(request)))) .retryWhen(RETRY_SPEC) .block(); } catch (Exception exception) { diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java index aac394efff..5662bc0752 100644 --- a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java @@ -20,9 +20,7 @@ import static com.linagora.tmail.RabbitMQDisconnectorConsumer.TMAIL_DISCONNECTOR_QUEUE_NAME; import static com.linagora.tmail.RabbitMQDisconnectorNotifier.TMAIL_DISCONNECTOR_EXCHANGE_NAME; -import static org.apache.james.backends.rabbitmq.Constants.AUTO_DELETE; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; -import static org.apache.james.backends.rabbitmq.Constants.EXCLUSIVE; import jakarta.inject.Inject; diff --git a/tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/DisconnectorRequestSerializerTest.java b/tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/DisconnectorRequestSerializerTest.java new file mode 100644 index 0000000000..cc53dbf6d2 --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/DisconnectorRequestSerializerTest.java @@ -0,0 +1,91 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import static net.javacrumbs.jsonunit.assertj.JsonAssertions.assertThatJson; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import java.nio.charset.StandardCharsets; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.apache.james.DisconnectorNotifier; +import org.apache.james.DisconnectorNotifier.AllUsersRequest; +import org.apache.james.DisconnectorNotifier.MultipleUserRequest; +import org.apache.james.core.Username; +import org.junit.jupiter.api.Test; + +import com.fasterxml.jackson.core.JsonProcessingException; + +import net.javacrumbs.jsonunit.core.Option; + +public class DisconnectorRequestSerializerTest { + + private final DisconnectorRequestSerializer testee = new DisconnectorRequestSerializer(); + + @Test + void serializeAllUsersRequestShouldReturnEmptyArray() throws JsonProcessingException { + assertThat(new String(testee.serialize(new AllUsersRequest()), StandardCharsets.UTF_8)) + .isEqualTo("[]"); + } + + @Test + void deserializeAllUsersRequestShouldReturnEmptyArray() { + assertThat(testee.deserialize("[]".getBytes(StandardCharsets.UTF_8))) + .isEqualTo(new AllUsersRequest()); + } + + @Test + void serializeMultipleUserRequestShouldReturnSerializedRequest() throws JsonProcessingException { + MultipleUserRequest multipleUserRequest = MultipleUserRequest.of(Set.of(Username.of("user1@domain.tld"), Username.of("user2@domain2.tld"))); + assertThatJson(new String(testee.serialize(multipleUserRequest), StandardCharsets.UTF_8)) + .withOptions(Option.IGNORING_ARRAY_ORDER) + .isEqualTo(""" + ["user1@domain.tld","user2@domain2.tld"]"""); + } + + @Test + void deserializeMultipleUserRequestShouldReturnDeserializedRequest() { + DisconnectorNotifier.Request deserialize = testee.deserialize(""" + ["user1@domain.tld","user2@domain2.tld"]""".getBytes(StandardCharsets.UTF_8)); + assertThat(deserialize).isInstanceOf(MultipleUserRequest.class) + .satisfies(request -> assertThat(((MultipleUserRequest) request).usernameList()) + .containsExactlyInAnyOrder(Username.of("user1@domain.tld"), Username.of("user2@domain2.tld"))); + } + + @Test + void deserializeOfSerializedMultipleUserRequestShouldReturnOriginalRequest() throws JsonProcessingException { + // generate random 10 Usernames + Set usernameSet = IntStream.range(0, 10) + .mapToObj(i -> Username.of("user" + i + "@abc.com" + UUID.randomUUID())) + .collect(Collectors.toSet()); + assertThat(testee.deserialize(testee.serialize(MultipleUserRequest.of(usernameSet)))) + .isEqualTo(MultipleUserRequest.of(usernameSet)); + } + + @Test + void shouldDeserializeFailedWhenInvalidJson() { + assertThatThrownBy(() -> testee.deserialize("invalid".getBytes(StandardCharsets.UTF_8))) + .hasMessageContaining("Error while deserializing: invalid") + .isInstanceOf(DisconnectorRequestSerializer.DisconnectorRequestSerializeException.class); + } +} diff --git a/tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/RabbitMQDisconnectorConsumerTest.java b/tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/RabbitMQDisconnectorConsumerTest.java new file mode 100644 index 0000000000..69a7defe8f --- /dev/null +++ b/tmail-backend/data/data-rabbitmq/src/test/java/com/linagora/tmail/RabbitMQDisconnectorConsumerTest.java @@ -0,0 +1,102 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import static org.apache.james.DisconnectorNotifier.AllUsersRequest.ALL_USERS_REQUEST; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import java.time.Duration; +import java.util.Set; + +import org.apache.james.DisconnectorNotifier.InVMDisconnectorNotifier; +import org.apache.james.DisconnectorNotifier.MultipleUserRequest; +import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.core.Username; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +public class RabbitMQDisconnectorConsumerTest { + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ() + .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG); + + RabbitMQDisconnectorOperator operator; + RabbitMQDisconnectorNotifier notifier; + RabbitMQDisconnectorConsumer consumer; + InVMDisconnectorNotifier inVmDisconnectorNotifier; + + @BeforeEach + void setup() throws Exception { + inVmDisconnectorNotifier = mock(InVMDisconnectorNotifier.class); + + DisconnectorRequestSerializer disconnectorRequestSerializer = new DisconnectorRequestSerializer(); + + consumer = new RabbitMQDisconnectorConsumer(rabbitMQExtension.getReceiverProvider(), + inVmDisconnectorNotifier, + disconnectorRequestSerializer); + + operator = new RabbitMQDisconnectorOperator(rabbitMQExtension.getSender(), + rabbitMQExtension.getRabbitMQ().getConfiguration(), + consumer); + + operator.init(); + notifier = new RabbitMQDisconnectorNotifier(rabbitMQExtension.getSender(), + disconnectorRequestSerializer); + } + + @AfterEach + void tearDown() { + if (consumer != null) { + consumer.close(); + } + } + + @Test + void vmDisconnectorShouldDisconnectWhenNotifyRequestAllUser() { + // when + notifier.disconnect(ALL_USERS_REQUEST); + + // then + verify(inVmDisconnectorNotifier, after(Duration.ofSeconds(1).toMillis())).disconnect(ALL_USERS_REQUEST); + } + + @Test + void vmDisconnectorShouldDisconnectWhenNotifyRequestMultipleUserWithSingleValue() { + // when + notifier.disconnect(MultipleUserRequest.of(Username.of("user1"))); + + // then + verify(inVmDisconnectorNotifier, after(Duration.ofSeconds(1).toMillis())) + .disconnect(MultipleUserRequest.of(Username.of("user1"))); + } + + @Test + void vmDisconnectorShouldDisconnectWhenNotifyRequestMultipleUserWithMultipleValues() { + // when + notifier.disconnect(MultipleUserRequest.of(Set.of(Username.of("user1"), Username.of("user2")))); + + // then + verify(inVmDisconnectorNotifier, after(Duration.ofSeconds(1).toMillis())) + .disconnect(MultipleUserRequest.of(Set.of(Username.of("user1"), Username.of("user2")))); + } +} diff --git a/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java b/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java index fabff6562d..7bc4996dcc 100644 --- a/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java +++ b/tmail-backend/integration-tests/webadmin/distributed-webadmin-integration-tests/src/test/java/com/linagora/tmail/integration/distributed/DistributedProtocolServerIntegrationTest.java @@ -18,33 +18,43 @@ package com.linagora.tmail.integration.distributed; +import static com.linagora.tmail.RabbitMQDisconnectorNotifier.TMAIL_DISCONNECTOR_EXCHANGE_NAME; import static com.linagora.tmail.integration.TestFixture.CALMLY_AWAIT; import static io.restassured.RestAssured.when; -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.util.ArrayList; import java.util.List; import java.util.Optional; -import java.util.concurrent.TimeUnit; +import java.util.UUID; import org.apache.james.GuiceJamesServer; import org.apache.james.JamesServerBuilder; import org.apache.james.JamesServerExtension; +import org.apache.james.backends.rabbitmq.ReactorRabbitMQChannelPool; +import org.apache.james.backends.rabbitmq.SimpleConnectionPool; import org.apache.james.backends.redis.RedisExtension; import org.apache.james.core.Domain; import org.apache.james.core.Username; import org.apache.james.jmap.api.model.TypeName; +import org.apache.james.metrics.api.NoopGaugeRegistry; +import org.apache.james.metrics.tests.RecordingMetricFactory; import org.apache.james.modules.AwsS3BlobStoreExtension; import org.apache.james.utils.DataProbeImpl; import org.apache.james.utils.WebAdminGuiceProbe; import org.apache.james.webadmin.WebAdminUtils; import org.awaitility.Durations; import org.eclipse.jetty.http.HttpStatus; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import com.linagora.tmail.RabbitMQDisconnectorNotifier; import com.linagora.tmail.blob.guice.BlobStoreConfiguration; import com.linagora.tmail.james.app.CassandraExtension; import com.linagora.tmail.james.app.DistributedJamesConfiguration; @@ -62,6 +72,11 @@ import com.linagora.tmail.module.LinagoraTestJMAPServerModule; import io.restassured.RestAssured; +import reactor.core.publisher.Flux; +import reactor.rabbitmq.BindingSpecification; +import reactor.rabbitmq.QueueSpecification; +import reactor.rabbitmq.Receiver; +import reactor.rabbitmq.Sender; import scala.jdk.javaapi.CollectionConverters; import scala.jdk.javaapi.OptionConverters; @@ -70,6 +85,9 @@ public class DistributedProtocolServerIntegrationTest { protected static final Username BOB = Username.fromLocalPartWithDomain("bob", DOMAIN); protected static final Username ALICE = Username.fromLocalPartWithDomain("alice", DOMAIN); + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = new RabbitMQExtension(); + @RegisterExtension static JamesServerExtension testExtension = new JamesServerBuilder(tmpDir -> DistributedJamesConfiguration.builder() @@ -87,7 +105,7 @@ public class DistributedProtocolServerIntegrationTest { .build()) .extension(new DockerOpenSearchExtension()) .extension(new CassandraExtension()) - .extension(new RabbitMQExtension()) + .extension(rabbitMQExtension) .extension(new RedisExtension()) .extension(new AwsS3BlobStoreExtension()) .server(configuration -> DistributedServer.createServer(configuration) @@ -96,8 +114,26 @@ public class DistributedProtocolServerIntegrationTest { .build(); + private SimpleConnectionPool connectionPool; + private ReactorRabbitMQChannelPool channelPool; + @BeforeEach void setUp(GuiceJamesServer server) throws Exception { + // setup test rabbitMQ receiver + connectionPool = new SimpleConnectionPool(rabbitMQExtension.dockerRabbitMQ().createRabbitConnectionFactory(), + SimpleConnectionPool.Configuration.builder() + .retries(2) + .initialDelay(Duration.ofMillis(5))); + + channelPool = new ReactorRabbitMQChannelPool(connectionPool.getResilientConnection(), + ReactorRabbitMQChannelPool.Configuration.builder() + .retries(2) + .maxBorrowDelay(Duration.ofMillis(250)) + .maxChannel(10), + new RecordingMetricFactory(), + new NoopGaugeRegistry()); + channelPool.start(); + WebAdminGuiceProbe webAdminGuiceProbe = server.getProbe(WebAdminGuiceProbe.class); DataProbeImpl dataProbe = server.getProbe(DataProbeImpl.class); @@ -111,6 +147,16 @@ void setUp(GuiceJamesServer server) throws Exception { RestAssured.enableLoggingOfRequestAndResponseIfValidationFails(); } + @AfterEach + void tearDown() { + if (channelPool != null) { + channelPool.close(); + } + if (connectionPool != null) { + connectionPool.close(); + } + } + @Test void disconnectShouldRevokeUserFirebaseSubscription(GuiceJamesServer server) { // Given a subscription @@ -165,4 +211,40 @@ private FirebaseSubscriptionCreationRequest createFirebaseSubscriptionCreationRe CollectionConverters.asScala(List.of(mockTypeName)).toSeq()); } + @Test + void shouldPublishFanoutMessageWhenDisconnectRouteIsCalled() { + // Given a queue bound to the fanout exchange + Sender sender = channelPool.getSender(); + String queueName = "test-queue" + UUID.randomUUID(); + Flux.concat(sender.declareQueue(QueueSpecification + .queue(queueName)), + sender.bind(BindingSpecification.binding() + .exchange(TMAIL_DISCONNECTOR_EXCHANGE_NAME) + .queue(queueName) + .routingKey(RabbitMQDisconnectorNotifier.ROUTING_KEY))) + .then().block(); + + ArrayList receivedMessages = new ArrayList<>(); + Flux.using(channelPool::createReceiver, + receiver -> receiver.consumeManualAck(queueName), + Receiver::close) + .doOnNext(ack -> { + receivedMessages.add(new String(ack.getBody(), StandardCharsets.UTF_8)); + ack.ack(); + }) + .subscribe(); + + // When disconnect + when() + .delete("/" + BOB.asString()) + .then() + .statusCode(HttpStatus.NO_CONTENT_204); + + // Then a message should be received + CALMLY_AWAIT.atMost(Durations.TEN_SECONDS) + .untilAsserted(() -> assertThat(receivedMessages).hasSize(1)); + + assertThat(receivedMessages.getFirst()).isEqualTo(""" + ["bob@domain.tld"]"""); + } } diff --git a/tmail-backend/pom.xml b/tmail-backend/pom.xml index 9ade02e910..a71e2ce241 100644 --- a/tmail-backend/pom.xml +++ b/tmail-backend/pom.xml @@ -45,6 +45,8 @@ combined-identity + data/data-rabbitmq + deployment-tests guice/blob-guice @@ -85,7 +87,6 @@ event-bus-redis tmail-third-party plugins - data/data-rabbitmq From ad2c96552e714af967bd10ccb35ff42ac43342e1 Mon Sep 17 00:00:00 2001 From: TungTV Date: Mon, 20 Jan 2025 11:46:01 +0700 Subject: [PATCH 3/4] fixup! ISSUE-5334 Disconnection: the distributed way --- .../com/linagora/tmail/ScheduledReconnectionHandler.java | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/ScheduledReconnectionHandler.java b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/ScheduledReconnectionHandler.java index 5aed5ad7b1..35d726842f 100644 --- a/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/ScheduledReconnectionHandler.java +++ b/tmail-backend/guice/distributed/src/main/java/com/linagora/tmail/ScheduledReconnectionHandler.java @@ -18,6 +18,7 @@ package com.linagora.tmail; +import static com.linagora.tmail.RabbitMQDisconnectorConsumer.TMAIL_DISCONNECTOR_QUEUE_NAME; import static com.rabbitmq.client.ConnectionFactory.DEFAULT_VHOST; import java.io.FileNotFoundException; @@ -314,14 +315,18 @@ class QueueNotFoundException extends RuntimeException { } - public static final ImmutableList QUEUES_TO_MONITOR = ImmutableList.of("JamesMailQueue-workqueue-spool", + public static final ImmutableList QUEUES_TO_MONITOR = new ImmutableList.Builder() + .add("JamesMailQueue-workqueue-spool", "JamesMailQueue-workqueue-outgoing", "mailboxEvent-workQueue-org.apache.james.events.GroupRegistrationHandler$GroupRegistrationHandlerGroup", "jmapEvent-workQueue-org.apache.james.events.GroupRegistrationHandler$GroupRegistrationHandlerGroup", "deleted-message-vault-work-queue", "openpaas-contacts-queue-add", "openpaas-contacts-queue-update", - "openpaas-contacts-queue-delete"); + "openpaas-contacts-queue-delete") + .add(TMAIL_DISCONNECTOR_QUEUE_NAME) + .build(); + private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledReconnectionHandler.class); private final Set reconnectionHandlers; From 8d322cfd2f3c94329dcd530888888d7e24937398 Mon Sep 17 00:00:00 2001 From: TungTV Date: Tue, 21 Jan 2025 14:52:18 +0700 Subject: [PATCH 4/4] fixup! ISSUE-5334 Disconnection: the distributed way --- .../tmail/RabbitMQDisconnectorConsumer.java | 23 +- .../tmail/RabbitMQDisconnectorOperator.java | 5 +- tmail-backend/pom.xml | 1 + .../webadmin/webadmin-protocols/pom.xml | 71 ++++++ .../tmail/DisconnectionRouteTest.java | 210 ++++++++++++++++++ .../tmail/TestConnectionDisconnector.java | 64 ++++++ 6 files changed, 367 insertions(+), 7 deletions(-) create mode 100644 tmail-backend/webadmin/webadmin-protocols/pom.xml create mode 100644 tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/DisconnectionRouteTest.java create mode 100644 tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/TestConnectionDisconnector.java diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java index 36d865dcc3..899dc3c92f 100644 --- a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorConsumer.java @@ -33,6 +33,7 @@ import org.slf4j.LoggerFactory; import com.github.fge.lambdas.Throwing; +import com.google.common.annotations.VisibleForTesting; import reactor.core.Disposable; import reactor.core.publisher.Flux; @@ -52,17 +53,27 @@ public class RabbitMQDisconnectorConsumer implements Startable, Closeable { private final ReceiverProvider receiverProvider; private final InVMDisconnectorNotifier inVMDisconnectorNotifier; private final DisconnectorRequestSerializer deserializer; + private final String disconnectorQueueName; private Disposable consumeMessages; - @Inject - @Singleton + @VisibleForTesting public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider, InVMDisconnectorNotifier inVMDisconnectorNotifier, - DisconnectorRequestSerializer deserializer) { + DisconnectorRequestSerializer deserializer, + String disconnectorQueueName) { this.receiverProvider = receiverProvider; this.inVMDisconnectorNotifier = inVMDisconnectorNotifier; this.deserializer = deserializer; + this.disconnectorQueueName = disconnectorQueueName; + } + + @Inject + @Singleton + public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider, + InVMDisconnectorNotifier inVMDisconnectorNotifier, + DisconnectorRequestSerializer deserializer) { + this(receiverProvider, inVMDisconnectorNotifier, deserializer, TMAIL_DISCONNECTOR_QUEUE_NAME); } public void start() { @@ -77,7 +88,7 @@ public void restart() { private Disposable doConsumeMessages() { return Flux.using(receiverProvider::createReceiver, - receiver -> receiver.consumeManualAck(TMAIL_DISCONNECTOR_QUEUE_NAME), + receiver -> receiver.consumeManualAck(disconnectorQueueName), Receiver::close) .flatMap(this::consumeMessage) .subscribe(); @@ -98,4 +109,8 @@ private Mono consumeMessage(AcknowledgableDelivery ackDelivery) { public void close() { Optional.ofNullable(consumeMessages).ifPresent(Disposable::dispose); } + + public String disconnectorQueueName() { + return disconnectorQueueName; + } } diff --git a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java index 5662bc0752..d7fec0beaf 100644 --- a/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java +++ b/tmail-backend/data/data-rabbitmq/src/main/java/com/linagora/tmail/RabbitMQDisconnectorOperator.java @@ -18,7 +18,6 @@ package com.linagora.tmail; -import static com.linagora.tmail.RabbitMQDisconnectorConsumer.TMAIL_DISCONNECTOR_QUEUE_NAME; import static com.linagora.tmail.RabbitMQDisconnectorNotifier.TMAIL_DISCONNECTOR_EXCHANGE_NAME; import static org.apache.james.backends.rabbitmq.Constants.DURABLE; @@ -63,12 +62,12 @@ public void init() { .type(BuiltinExchangeType.FANOUT.getType()) .durable(DURABLE)), sender.declareQueue(QueueSpecification - .queue(TMAIL_DISCONNECTOR_QUEUE_NAME) + .queue(disconnectorConsumer.disconnectorQueueName()) .durable(DURABLE) .arguments(queueArgumentSupplier().build())), sender.bind(BindingSpecification.binding() .exchange(TMAIL_DISCONNECTOR_EXCHANGE_NAME) - .queue(TMAIL_DISCONNECTOR_QUEUE_NAME) + .queue(disconnectorConsumer.disconnectorQueueName()) .routingKey(RabbitMQDisconnectorNotifier.ROUTING_KEY))) .then().block(); diff --git a/tmail-backend/pom.xml b/tmail-backend/pom.xml index a71e2ce241..553e312b8c 100644 --- a/tmail-backend/pom.xml +++ b/tmail-backend/pom.xml @@ -80,6 +80,7 @@ webadmin/webadmin-email-address-contact webadmin/webadmin-mailbox + webadmin/webadmin-protocols webadmin/webadmin-team-mailboxes webadmin/webadmin-rate-limit healthcheck diff --git a/tmail-backend/webadmin/webadmin-protocols/pom.xml b/tmail-backend/webadmin/webadmin-protocols/pom.xml new file mode 100644 index 0000000000..21e7f8f970 --- /dev/null +++ b/tmail-backend/webadmin/webadmin-protocols/pom.xml @@ -0,0 +1,71 @@ + + + 4.0.0 + + com.linagora.tmail + tmail-backend + 1.0.0-SNAPSHOT + ../../pom.xml + + + tmail-webadmin-protocols + Twake Mail :: Web Admin :: Protocols + + + + ${project.groupId} + data-rabbitmq + test + + + ${james.groupId} + apache-james-mailbox-api + test-jar + test + + + ${james.groupId} + james-server-data-memory + test + + + ${james.groupId} + apache-james-backends-rabbitmq + test-jar + test + + + ${james.groupId} + james-server-webadmin-core + test-jar + test + + + ${james.groupId} + james-server-webadmin-core + test + + + ${james.groupId} + james-server-testing + test + + + ${james.groupId} + james-server-webadmin-protocols + test + + + ${james.groupId} + metrics-tests + test + + + ${james.groupId} + testing-base + test + + + \ No newline at end of file diff --git a/tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/DisconnectionRouteTest.java b/tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/DisconnectionRouteTest.java new file mode 100644 index 0000000000..90145f2e1e --- /dev/null +++ b/tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/DisconnectionRouteTest.java @@ -0,0 +1,210 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import static io.restassured.RestAssured.given; +import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.not; + +import java.util.List; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import org.apache.james.DisconnectorNotifier; +import org.apache.james.backends.rabbitmq.RabbitMQExtension; +import org.apache.james.core.Username; +import org.apache.james.protocols.webadmin.ProtocolServerRoutes; +import org.apache.james.webadmin.WebAdminServer; +import org.apache.james.webadmin.WebAdminUtils; +import org.awaitility.Awaitility; +import org.awaitility.Durations; +import org.awaitility.core.ConditionFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.github.fge.lambdas.Throwing; + +import io.restassured.specification.RequestSpecification; + +public class DisconnectionRouteTest { + + public static final String BOB = "bob@domain.tld"; + public static final Username BOB_USERNAME = Username.of(BOB); + public static final Username ALICE_USERNAME = Username.of("alice@domain.tld"); + public static final ConditionFactory CALMLY_AWAIT = Awaitility + .with().pollInterval(ONE_HUNDRED_MILLISECONDS) + .and().pollDelay(ONE_HUNDRED_MILLISECONDS) + .await(); + + @RegisterExtension + static RabbitMQExtension rabbitMQExtension = RabbitMQExtension.singletonRabbitMQ() + .isolationPolicy(RabbitMQExtension.IsolationPolicy.STRONG); + + private WebAdminServer webAdminServer1; + private WebAdminServer webAdminServer2; + + private WebAdminServer webAdminServer(List initConnectedUsers) { + TestConnectionDisconnector testConnectionDisconnector = new TestConnectionDisconnector(initConnectedUsers); + DisconnectorNotifier.InVMDisconnectorNotifier inVmDisconnectorNotifier = new DisconnectorNotifier.InVMDisconnectorNotifier(testConnectionDisconnector); + DisconnectorRequestSerializer disconnectorRequestSerializer = new DisconnectorRequestSerializer(); + RabbitMQDisconnectorConsumer rabbitMQDisconnectorConsumer = new RabbitMQDisconnectorConsumer(rabbitMQExtension.getReceiverProvider(), + inVmDisconnectorNotifier, + disconnectorRequestSerializer, + UUID.randomUUID().toString()); + + RabbitMQDisconnectorOperator rabbitMQDisconnectorOperator = new RabbitMQDisconnectorOperator(rabbitMQExtension.getSender(), + Throwing.supplier(() -> rabbitMQExtension.getRabbitMQ().getConfiguration()).get(), rabbitMQDisconnectorConsumer); + + rabbitMQDisconnectorOperator.init(); + + DisconnectorNotifier disconnectorNotifier = new RabbitMQDisconnectorNotifier(rabbitMQExtension.getSender(), disconnectorRequestSerializer); + ProtocolServerRoutes protocolServerRoutes = new ProtocolServerRoutes(Set.of(), disconnectorNotifier, testConnectionDisconnector); + return WebAdminUtils.createWebAdminServer(protocolServerRoutes).start(); + } + + private RequestSpecification requestSpecification(WebAdminServer webAdminServer) { + return WebAdminUtils.buildRequestSpecification(webAdminServer) + .setBasePath("/servers") + .build(); + } + + @AfterEach + void tearDown() { + if (webAdminServer1 != null) { + webAdminServer1.destroy(); + } + if (webAdminServer2 != null) { + webAdminServer2.destroy(); + } + } + + @Test + void bobShouldBeDisconnectedOnConnectedServer() { + // Given Bob is connected on server 1 + webAdminServer1 = webAdminServer(List.of(BOB_USERNAME)); + webAdminServer2 = webAdminServer(List.of()); + + // Verify Bob is connected on server 1 + given() + .spec(requestSpecification(webAdminServer1)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", containsInAnyOrder(BOB)); + + // When call disconnection bob on server 2 + given() + .spec(requestSpecification(webAdminServer2)) + .delete("/channels/" + BOB) + .then() + .statusCode(204); + + // Then Bob should be disconnected on server 1 + CALMLY_AWAIT.atMost(Durations.TEN_SECONDS) + .untilAsserted(() -> + given() + .spec(requestSpecification(webAdminServer1)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", not(containsInAnyOrder(BOB)))); + } + + @Test + void bobShouldBeDisconnectedOnAllServerWhenAskedOnAnyServer() { + // Given Bob is connected on server 1 and server 2 + webAdminServer1 = webAdminServer(List.of(BOB_USERNAME)); + webAdminServer2 = webAdminServer(List.of(BOB_USERNAME)); + + // Verify Bob is connected on server 1 + given() + .spec(requestSpecification(webAdminServer1)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", containsInAnyOrder(BOB)); + + // Verify Bob is connected on server 2 + given() + .spec(requestSpecification(webAdminServer2)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", containsInAnyOrder(BOB)); + + // When call disconnection bob on any server + given() + .spec(requestSpecification(webAdminServer1)) + .delete("/channels/" + BOB) + .then() + .statusCode(204); + + // Then Bob should be disconnected on all server + CALMLY_AWAIT.atMost(Durations.TEN_SECONDS) + .untilAsserted(() -> { + given() + .spec(requestSpecification(webAdminServer1)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", not(containsInAnyOrder(BOB))); + + given() + .spec(requestSpecification(webAdminServer2)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", not(containsInAnyOrder(BOB))); + }); + } + + @Test + void bobShouldNotDisconnectedWhenDisconnectAlice() throws InterruptedException { + // Given Bob is connected on server 1 + webAdminServer1 = webAdminServer(List.of(BOB_USERNAME)); + webAdminServer2 = webAdminServer(List.of()); + + // Verify Bob is connected on server 1 + given() + .spec(requestSpecification(webAdminServer1)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", containsInAnyOrder(BOB)); + + // When call disconnection Alice on server 2 + given() + .spec(requestSpecification(webAdminServer2)) + .delete("/channels/" + ALICE_USERNAME.asString()) + .then() + .statusCode(204); + + // Then Bob should not be disconnected on server 1 + TimeUnit.SECONDS.sleep(1); + given() + .spec(requestSpecification(webAdminServer1)) + .get("/connectedUsers") + .then() + .statusCode(200) + .body("", containsInAnyOrder(BOB)); + } +} \ No newline at end of file diff --git a/tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/TestConnectionDisconnector.java b/tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/TestConnectionDisconnector.java new file mode 100644 index 0000000000..c52cd2ad4c --- /dev/null +++ b/tmail-backend/webadmin/webadmin-protocols/src/test/java/com/linagora/tmail/TestConnectionDisconnector.java @@ -0,0 +1,64 @@ +/******************************************************************** + * As a subpart of Twake Mail, this file is edited by Linagora. * + * * + * https://twake-mail.com/ * + * https://linagora.com * + * * + * This file is subject to The Affero Gnu Public License * + * version 3. * + * * + * https://www.gnu.org/licenses/agpl-3.0.en.html * + * * + * This program is distributed in the hope that it will be * + * useful, but WITHOUT ANY WARRANTY; without even the implied * + * warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR * + * PURPOSE. See the GNU Affero General Public License for * + * more details. * + ********************************************************************/ + +package com.linagora.tmail; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.function.Predicate; +import java.util.stream.Stream; + +import org.apache.james.core.ConnectionDescription; +import org.apache.james.core.ConnectionDescriptionSupplier; +import org.apache.james.core.Disconnector; +import org.apache.james.core.Username; + + +public class TestConnectionDisconnector implements Disconnector, ConnectionDescriptionSupplier { + + public final ArrayList connectedUsers; + + public TestConnectionDisconnector(List connectedUsers) { + this.connectedUsers = new ArrayList<>(connectedUsers); + } + + @Override + public Stream describeConnections() { + return connectedUsers.stream() + .map(TestConnectionDisconnector::connectionDescription); + } + + @Override + public void disconnect(Predicate username) { + connectedUsers.removeIf(username); + } + + public static ConnectionDescription connectionDescription(Username username) { + return new ConnectionDescription( + "test-protocol", + "test-endpoint", + Optional.empty(), + Optional.empty(), + true, true, true, true, + Optional.of(username), + Map.of()); + } + +}