Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ISSUE-5334 Disconnection: the distributed way #1473

Merged
merged 4 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@
import com.linagora.tmail.OpenPaasContactsConsumerModule;
import com.linagora.tmail.OpenPaasModule;
import com.linagora.tmail.OpenPaasModuleChooserConfiguration;
import com.linagora.tmail.RabbitMQDisconnectorModule;
import com.linagora.tmail.ScheduledReconnectionHandler;
import com.linagora.tmail.UsersRepositoryModuleChooser;
import com.linagora.tmail.blob.guice.BlobStoreCacheModulesChooser;
Expand Down Expand Up @@ -338,6 +339,7 @@ protected void configure() {
new DistributedEmailAddressContactEventDeadLettersModule(),
new DistributedTaskSerializationModule(),
new JMAPEventBusModule(),
new RabbitMQDisconnectorModule(),
new RabbitMQEmailAddressContactModule(),
new RabbitMQEventBusModule(),
new RabbitMQModule(),
Expand Down
61 changes: 61 additions & 0 deletions tmail-backend/data/data-rabbitmq/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.linagora.tmail</groupId>
<artifactId>tmail-backend</artifactId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>

<artifactId>data-rabbitmq</artifactId>
<name>Twake Mail :: Data :: RabbitMQ</name>

<dependencies>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-rabbitmq</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>apache-james-backends-rabbitmq</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-data-api</artifactId>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>james-server-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>metrics-tests</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>${james.groupId}</groupId>
<artifactId>testing-base</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>net.javacrumbs.json-unit</groupId>
<artifactId>json-unit-assertj</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/********************************************************************
* As a subpart of Twake Mail, this file is edited by Linagora. *
* *
* https://twake-mail.com/ *
* https://linagora.com *
* *
* This file is subject to The Affero Gnu Public License *
* version 3. *
* *
* https://www.gnu.org/licenses/agpl-3.0.en.html *
* *
* This program is distributed in the hope that it will be *
* useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR *
* PURPOSE. See the GNU Affero General Public License for *
* more details. *
********************************************************************/

package com.linagora.tmail;

import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.james.DisconnectorNotifier.AllUsersRequest;
import org.apache.james.DisconnectorNotifier.MultipleUserRequest;
import org.apache.james.DisconnectorNotifier.Request;
import org.apache.james.core.Username;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;

public class DisconnectorRequestSerializer {

public static class DisconnectorRequestSerializeException extends RuntimeException {

public DisconnectorRequestSerializeException(String message, Throwable cause) {
super(message, cause);
}
}

public static final String ALL_USERS_REQUEST = "[]";
public static final byte[] ALL_USERS_REQUEST_BYTES = ALL_USERS_REQUEST.getBytes(StandardCharsets.UTF_8);
public static final TypeReference<List<String>> LIST_OF_STRING = new TypeReference<>() {
};

private final ObjectMapper objectMapper;

@Inject
@Singleton
public DisconnectorRequestSerializer() {
this.objectMapper = new ObjectMapper();
}

public byte[] serialize(Request request) throws JsonProcessingException {
return switch (request) {
case MultipleUserRequest multipleUserRequest -> objectMapper.writeValueAsBytes(
multipleUserRequest.usernameList().stream()
.map(Username::asString)
.toList());
case AllUsersRequest allUsersRequest -> ALL_USERS_REQUEST_BYTES;
};
}

public Request deserialize(byte[] serialized) {
if (serialized.length == 2 && serialized[0] == '[' && serialized[1] == ']') {
return AllUsersRequest.ALL_USERS_REQUEST;
}
try {
Set<Username> usernameSet = objectMapper.readValue(serialized, LIST_OF_STRING)
.stream().map(Username::of)
.collect(Collectors.toSet());
return new MultipleUserRequest(usernameSet);
} catch (Exception e) {
throw new DisconnectorRequestSerializeException("Error while deserializing: " + new String(serialized, StandardCharsets.UTF_8), e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/********************************************************************
* As a subpart of Twake Mail, this file is edited by Linagora. *
* *
* https://twake-mail.com/ *
* https://linagora.com *
* *
* This file is subject to The Affero Gnu Public License *
* version 3. *
* *
* https://www.gnu.org/licenses/agpl-3.0.en.html *
* *
* This program is distributed in the hope that it will be *
* useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR *
* PURPOSE. See the GNU Affero General Public License for *
* more details. *
********************************************************************/

package com.linagora.tmail;

import java.io.Closeable;
import java.net.InetAddress;
import java.util.Optional;
import java.util.UUID;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.james.DisconnectorNotifier.InVMDisconnectorNotifier;
import org.apache.james.backends.rabbitmq.ReceiverProvider;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.github.fge.lambdas.Throwing;

import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.rabbitmq.AcknowledgableDelivery;
import reactor.rabbitmq.Receiver;

public class RabbitMQDisconnectorConsumer implements Startable, Closeable {

public static final String TMAIL_DISCONNECTOR_QUEUE_NAME = "tmail-disconnector-" +
Throwing.supplier(() -> InetAddress.getLocalHost().getHostName()).get() +
"-" + UUID.randomUUID();

private static final boolean REQUEUE_ON_NACK = true;
private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorConsumer.class);

private final ReceiverProvider receiverProvider;
private final InVMDisconnectorNotifier inVMDisconnectorNotifier;
private final DisconnectorRequestSerializer deserializer;

private Disposable consumeMessages;

@Inject
@Singleton
public RabbitMQDisconnectorConsumer(ReceiverProvider receiverProvider,
InVMDisconnectorNotifier inVMDisconnectorNotifier,
DisconnectorRequestSerializer deserializer) {
this.receiverProvider = receiverProvider;
this.inVMDisconnectorNotifier = inVMDisconnectorNotifier;
this.deserializer = deserializer;
}

public void start() {
consumeMessages = doConsumeMessages();
}

public void restart() {
Disposable previousConsumer = consumeMessages;
consumeMessages = doConsumeMessages();
Optional.ofNullable(previousConsumer).ifPresent(Disposable::dispose);
}

private Disposable doConsumeMessages() {
return Flux.using(receiverProvider::createReceiver,
receiver -> receiver.consumeManualAck(TMAIL_DISCONNECTOR_QUEUE_NAME),
Receiver::close)
.flatMap(this::consumeMessage)
.subscribe();
}

private Mono<Void> consumeMessage(AcknowledgableDelivery ackDelivery) {
return Mono.fromCallable(() -> deserializer.deserialize(ackDelivery.getBody()))
.flatMap(disconnectorRequest -> Mono.fromRunnable(() -> inVMDisconnectorNotifier.disconnect(disconnectorRequest)).then())
.doOnSuccess(result -> ackDelivery.ack())
.onErrorResume(error -> {
LOGGER.error("Error when consume message", error);
ackDelivery.nack(!REQUEUE_ON_NACK);
return Mono.empty();
});
}

@Override
public void close() {
Optional.ofNullable(consumeMessages).ifPresent(Disposable::dispose);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/********************************************************************
* As a subpart of Twake Mail, this file is edited by Linagora. *
* *
* https://twake-mail.com/ *
* https://linagora.com *
* *
* This file is subject to The Affero Gnu Public License *
* version 3. *
* *
* https://www.gnu.org/licenses/agpl-3.0.en.html *
* *
* This program is distributed in the hope that it will be *
* useful, but WITHOUT ANY WARRANTY; without even the implied *
* warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR *
* PURPOSE. See the GNU Affero General Public License for *
* more details. *
********************************************************************/

package com.linagora.tmail;

import java.time.Duration;

import jakarta.inject.Inject;
import jakarta.inject.Singleton;

import org.apache.commons.lang3.StringUtils;
import org.apache.james.DisconnectorNotifier;
import org.apache.james.lifecycle.api.Startable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import reactor.core.publisher.Mono;
import reactor.rabbitmq.OutboundMessage;
import reactor.rabbitmq.Sender;
import reactor.util.retry.Retry;

public class RabbitMQDisconnectorNotifier implements DisconnectorNotifier, Startable {
public static final String TMAIL_DISCONNECTOR_EXCHANGE_NAME = "tmail-disconnector";
public static final String ROUTING_KEY = StringUtils.EMPTY;

private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQDisconnectorNotifier.class);
private static final Retry RETRY_SPEC = Retry.backoff(2, Duration.ofMillis(100));

private final Sender sender;
private final DisconnectorRequestSerializer serializer;

@Inject
@Singleton
public RabbitMQDisconnectorNotifier(Sender sender,
DisconnectorRequestSerializer serializer) {
this.sender = sender;
this.serializer = serializer;
}

@Override
public void disconnect(Request request) {
try {
sender.send(Mono.just(new OutboundMessage(TMAIL_DISCONNECTOR_EXCHANGE_NAME,
ROUTING_KEY,
serializer.serialize(request))))
.retryWhen(RETRY_SPEC)
.block();
} catch (Exception exception) {
LOGGER.error("Error while sending disconnection request", exception);
}
}
}
Loading