Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ENHANCEMENT] Make the Apisix plugin reactive #1412

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@
import io.lettuce.core.ReadFrom;
import io.lettuce.core.RedisClient;
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.reactive.RedisReactiveCommands;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import io.lettuce.core.cluster.RedisClusterClient;
import io.lettuce.core.cluster.api.reactive.RedisAdvancedClusterReactiveCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
Expand All @@ -37,7 +39,7 @@ public class AppConfiguration {
@Value("${redis.password}")
private String redisPassword;

@Value("${redis.timeout:5000}")
@Value("${redis.timeout:500}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for this

private Integer redisTimeout;

@Value("${redis.ignoreErrors:true}")
Expand All @@ -56,7 +58,7 @@ public IRevokedTokenRepository revokedTokenRepository() {
return new IRevokedTokenRepository.MemoryRevokedTokenRepository();
}

private RedisStringCommands<String, String> initRedisCommand() {
private RedisStringReactiveCommands<String, String> initRedisCommand() {
Duration timeoutDuration = Duration.ofMillis(redisTimeout);

logger.info("The plugin using redis {} for storage revoked tokens.\n" +
Expand All @@ -76,14 +78,14 @@ private RedisStringCommands<String, String> initRedisCommand() {
}
}

public static RedisStringCommands<String, String> initRedisCommandSentinel(String redisUrl,
public static RedisReactiveCommands<String, String> initRedisCommandSentinel(String redisUrl,
Duration redisTimeout) {
RedisURI redisURI = RedisURI.create(redisUrl);
redisURI.setTimeout(redisTimeout);
return RedisClient.create(redisURI).connect().sync();
return RedisClient.create(redisURI).connect().reactive();
}

public static RedisStringCommands<String, String> initRedisCommandStandalone(String redisUrl,
public static RedisReactiveCommands<String, String> initRedisCommandStandalone(String redisUrl,
String redisPassword,
Duration redisTimeout) {
String[] redisUrlParts = redisUrl.split(":");
Expand All @@ -94,26 +96,26 @@ public static RedisStringCommands<String, String> initRedisCommandStandalone(Str
.withTimeout(redisTimeout)
.build();

return RedisClient.create(redisURI).connect().sync();
return RedisClient.create(redisURI).connect().reactive();
}

public static RedisStringCommands<String, String> initRedisCommandCluster(String redisUrl,
String redisPassword,
Duration redisTimeout) {
public static RedisAdvancedClusterReactiveCommands<String, String> initRedisCommandCluster(String redisUrl,
String redisPassword,
Duration redisTimeout) {
List<RedisURI> redisURIList = buildRedisUriList(redisUrl, redisPassword, redisTimeout);
return RedisClusterClient.create(redisURIList).connect().sync();
return RedisClusterClient.create(redisURIList).connect().reactive();
}

public static RedisStringCommands<String, String> initRedisCommandMasterReplica(String redisUrl,
String redisPassword,
Duration redisTimeout) {
public static RedisReactiveCommands<String, String> initRedisCommandMasterReplica(String redisUrl,
String redisPassword,
Duration redisTimeout) {
List<RedisURI> redisURIList = buildRedisUriList(redisUrl, redisPassword, redisTimeout);

RedisClient redisClient = RedisClient.create();
StatefulRedisMasterReplicaConnection<String, String> connection = MasterReplica
.connect(redisClient, StringCodec.UTF8, redisURIList);
connection.setReadFrom(ReadFrom.MASTER_PREFERRED);
return connection.sync();
return connection.reactive();
}

public static List<RedisURI> buildRedisUriList(String redisUrl, String redisPassword, Duration redisTimeout) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public ResponseEntity<?> addRevokedToken(@RequestParam MultiValueMap<String, Str
.flatMap(ChannelLogoutController::extractSidFromLogoutToken)
.ifPresentOrElse(sid -> {
logger.debug("Add new revoked token has sid: " + sid);
tokenRepository.add(sid);
tokenRepository.add(sid).block();
}, () -> logger.warn("`{}` is missing or invalid in request", TOKEN_PARAM));

return ResponseEntity.ok().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

import reactor.core.publisher.Mono;

public interface IRevokedTokenRepository {

void add(String sid);
Mono<Void> add(String sid);

boolean exist(String sid);
Mono<Boolean> exist(String sid);

class MemoryRevokedTokenRepository implements IRevokedTokenRepository {

Expand All @@ -22,13 +24,13 @@ public MemoryRevokedTokenRepository() {
}

@Override
public void add(String sid) {
cache.put(sid, true);
public Mono<Void> add(String sid) {
return Mono.fromRunnable(() -> cache.put(sid, true));
}

@Override
public boolean exist(String sid) {
return cache.getIfPresent(sid) != null;
public Mono<Boolean> exist(String sid) {
return Mono.fromCallable(() -> cache.getIfPresent(sid) != null);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,48 +5,50 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.lettuce.core.RedisException;
import io.lettuce.core.SetArgs;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import reactor.core.publisher.Mono;

public class RedisRevokedTokenRepository implements IRevokedTokenRepository {

public static final Duration CACHE_DURATION = Duration.ofDays(7);
public static final Boolean IGNORE_REDIS_ERRORS = true;
private final Logger logger = LoggerFactory.getLogger("RevokedTokenPlugin");

private final RedisStringCommands<String, String> redisCommand;
private final RedisStringReactiveCommands<String, String> redisCommand;

private final boolean ignoreRedisErrors;

public RedisRevokedTokenRepository(RedisStringCommands<String, String> redisCommand,
public RedisRevokedTokenRepository(RedisStringReactiveCommands<String, String> redisCommand,
boolean ignoreRedisErrors) {
this.redisCommand = redisCommand;
this.ignoreRedisErrors = ignoreRedisErrors;
}

@Override
public void add(String sid) {
try {
redisCommand.set(sid, Boolean.TRUE.toString(), SetArgs.Builder.ex(CACHE_DURATION));
} catch (RedisException e) {
if (!ignoreRedisErrors) {
throw e;
}
logger.warn("Error while add revoked token in Redis: {}", e.getMessage());
}
public Mono<Void> add(String sid) {
return redisCommand.set(sid, Boolean.TRUE.toString(), SetArgs.Builder.ex(CACHE_DURATION))
.then()
.onErrorResume(e -> {
if (!ignoreRedisErrors) {
return Mono.error(e);
}
logger.warn("Error while add revoked token in Redis: {}", e.getMessage());
return Mono.empty();
});
}

@Override
public boolean exist(String sid) {
try {
return Boolean.TRUE.toString().equalsIgnoreCase(redisCommand.get(sid));
} catch (RedisException e) {
if (!ignoreRedisErrors) {
throw e;
}
logger.warn("Error while checking token in Redis: {}", e.getMessage());
return false;
}
public Mono<Boolean> exist(String sid) {
return redisCommand.get(sid)
.map(Boolean.TRUE.toString()::equalsIgnoreCase)
.switchIfEmpty(Mono.just(false))
.onErrorResume(e -> {
if (!ignoreRedisErrors) {
return Mono.error(e);
}
logger.warn("Error while checking token in Redis: {}", e.getMessage());
return Mono.just(false);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import reactor.core.publisher.Mono;

@Component
public class TokenRevokedFilter implements PluginFilter {
private final Logger logger = LoggerFactory.getLogger("RevokedTokenPlugin");
Expand All @@ -29,25 +31,32 @@ public String name() {
@Override
public void filter(HttpRequest request, HttpResponse response, PluginFilterChain chain) {
logger.debug("Received a new request");
sidFilter(request, response);
chain.filter(request, response);
sidFilter(request, response)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's possible that chain.filter(request, response) will be called AFTER (bad case) or BEFORE (perfect case) the "PluginFilterChain" had completed

+1 🍺 if it works

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's callback based, it is fine!
That's Netty under the hood it feels like home.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's callback based, it is fine! That's Netty under the hood it feels like home.

Could you refer to any document that prove this "callback"? I agree with Tung. Code we put in reactor could be executed after response is sent to client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are tiring me, so deeply, some days.

Copy link
Member Author

@chibenwa chibenwa Dec 17, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because implementing a callback interface:

-> it's clear that though asynchronous the callbacks are still executed in order
-> And sending the response to the client would trivially be implemented in the very last filter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apache/apisix-java-plugin-runner#313

Please qualify the new plugin with the new plugin runner ;-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah with the updated plugin runner, the reactive code works well.

I have tested case token revoked --> pass
I have conducted performance test like this #1247 (comment) --> The result is good. The latency is always around 5s (I set timeout to 5s).

Regarding updating the plugin runner, I wonder if there is anyone reviewing and approving the PR. In case the repository is no longer maintained, I suggest we fork, create our own repository for the plugin runner (as well as CI/CD)

Copy link
Member Author

@chibenwa chibenwa Dec 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding updating the plugin runner, I wonder if there is anyone reviewing and approving the PR.

It is Christmass time.

Be patient.

We can package, build and deploy the PR for the time being.

Can we do a JMAP perf test on top of this change?
Can we publish the updated Apisix docker image with both changes to the runner and to the plugin?

I have conducted performance test like this #1247 (comment) --> The result is good. The latency is always around 5s (I set timeout to 5s).

(I told you: let's patch the causes and not the consequences)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apisix docker image: hungphan227/apisix:3.9.1-debian-javaplugin

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is complicated to update staging env to run jmap perf test with apisix. Therefore, I will update james-gatling and run it locally with tmail-backend memory version.

.doFinally(any -> chain.filter(request, response))
.subscribe();
}

private void sidFilter(HttpRequest request, HttpResponse response) {
Optional.ofNullable(request.getHeader("Authorization"))
private Mono<Void> sidFilter(HttpRequest request, HttpResponse response) {
return Optional.ofNullable(request.getHeader("Authorization"))
.or(() -> Optional.ofNullable(request.getHeader("authorization")))
.map(String::trim)
.map(bearerToken -> bearerToken.startsWith("Bearer ") ? bearerToken.substring(7) : bearerToken)
.flatMap(ChannelLogoutController::extractSidFromLogoutToken)
.ifPresent(sid -> {
boolean existSid = tokenRepository.exist(sid);
.map(sid -> validateSid(request, response, sid))
.orElse(Mono.empty());
}

private Mono<Void> validateSid(HttpRequest request, HttpResponse response, String sid) {
return tokenRepository.exist(sid)
.doOnNext(existSid -> {
if (existSid) {
logger.info("Token has been revoked, Sid: " + sid);
makeUnAuthorizedRequest(request, response);
} else {
logger.debug("Token valid, Sid: " + sid);
}
});
})
.then();
}

public static void makeUnAuthorizedRequest(HttpRequest request, HttpResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import org.testcontainers.utility.DockerImageName;

import io.lettuce.core.RedisException;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;

class RedisMasterReplicaRevokedTokenRepositoryTest implements RevokedTokenRepositoryContract {
private static final String REDIS_PASSWORD = "my_password";
Expand Down Expand Up @@ -75,7 +75,7 @@ static void afterAll() {

@BeforeEach
void setup() {
RedisStringCommands<String, String> stringStringRedisStringCommands = AppConfiguration.initRedisCommandMasterReplica(
RedisStringReactiveCommands<String, String> stringStringRedisStringCommands = AppConfiguration.initRedisCommandMasterReplica(
String.format("localhost:%d,localhost:%d",
REDIS_MASTER.getMappedPort(6379), REDIS_REPLICA.getMappedPort(6379)),
REDIS_PASSWORD, Duration.ofSeconds(3));
Expand Down Expand Up @@ -106,16 +106,16 @@ void existShouldNotThrowWhenIgnoreWasConfiguredAndRedisError() throws Interrupte

@Test
void existsShouldReturnCorrectWhenIgnoreWasConfigured() throws InterruptedException {
testee().add("sid1");
assertThat(testee().exist("sid1")).isTrue();
testee().add("sid1").block();
assertThat(testee().exist("sid1").block()).isTrue();

ContainerHelper.pause(REDIS_MASTER);
TimeUnit.SECONDS.sleep(1);
assertThat(testee().exist("sid1")).isFalse();
assertThat(testee().exist("sid1").block()).isFalse();

ContainerHelper.unPause(REDIS_MASTER);
TimeUnit.SECONDS.sleep(1);
assertThat(testee().exist("sid1")).isTrue();
assertThat(testee().exist("sid1").block()).isTrue();
}

@Test
Expand All @@ -128,6 +128,6 @@ void existsShouldThrowWhenIgnoreWasNotConfiguredAndRedisError() throws Interrupt

ContainerHelper.pause(REDIS_MASTER);
TimeUnit.SECONDS.sleep(1);
assertThatThrownBy(() -> testee.exist("sid1")).isInstanceOf(RedisException.class);
assertThatThrownBy(() -> testee.exist("sid1").block()).isInstanceOf(RedisException.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import org.testcontainers.utility.DockerImageName;

import io.lettuce.core.RedisException;
import io.lettuce.core.api.sync.RedisStringCommands;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;

class RedisRevokedTokenRepositoryTest implements RevokedTokenRepositoryContract {
static final String REDIS_PASSWORD = "redisSecret1";
Expand All @@ -40,7 +40,7 @@ static void afterAll() {

@BeforeEach
void beforeEach() {
RedisStringCommands<String, String> redisStringCommands = AppConfiguration.initRedisCommandStandalone(
RedisStringReactiveCommands<String, String> redisStringCommands = AppConfiguration.initRedisCommandStandalone(
String.format("%s:%d", REDIS_CONTAINER.getHost(), REDIS_CONTAINER.getMappedPort(6379)),
REDIS_PASSWORD, Duration.ofSeconds(3));
testee = new RedisRevokedTokenRepository(redisStringCommands, IGNORE_REDIS_ERRORS);
Expand All @@ -65,22 +65,22 @@ void existShouldNotThrowWhenIgnoreWasConfiguredAndRedisError() throws Interrupte
ContainerHelper.pause(REDIS_CONTAINER);
TimeUnit.SECONDS.sleep(1);

assertThatCode(() -> testee().exist("sid1")).doesNotThrowAnyException();
assertThatCode(() -> testee().exist("sid1").block()).doesNotThrowAnyException();
}


@Test
void existsShouldReturnCorrectWhenIgnoreWasConfigured() throws InterruptedException {
testee().add("sid1");
assertThat(testee().exist("sid1")).isTrue();
testee().add("sid1").block();
assertThat(testee().exist("sid1").block()).isTrue();

ContainerHelper.pause(REDIS_CONTAINER);
TimeUnit.SECONDS.sleep(1);
assertThat(testee().exist("sid1")).isFalse();
assertThat(testee().exist("sid1").block()).isFalse();

ContainerHelper.unPause(REDIS_CONTAINER);
TimeUnit.SECONDS.sleep(1);
assertThat(testee().exist("sid1")).isTrue();
assertThat(testee().exist("sid1").block()).isTrue();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@

import io.lettuce.core.RedisClient;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.RedisStringReactiveCommands;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.api.sync.RedisStringCommands;

@Disabled("This test is disabled because it requires a Redis Sentinel cluster to be running, for manual testing only, " +
"can run Redis Sentinel cluster by using docker-compose sample at https://github.com/apache/james-project/blob/149595da247dfb915ecb60d239edf627616916ae/server/mailet/rate-limiter-redis/docker-compose-sample/docker-compose-with-redis-sentinel.yml")
Expand All @@ -21,7 +21,7 @@ public class RedisSentinelRevokedTokenRepositoryTest implements RevokedTokenRepo

RedisRevokedTokenRepository revokedTokenRepository;

RedisStringCommands<String, String> stringStringRedisStringCommands;
RedisStringReactiveCommands<String, String> stringStringRedisStringCommands;

@BeforeEach
void setup() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,28 +12,28 @@ interface RevokedTokenRepositoryContract {

@Test
default void existShouldReturnFalseWhenDoesNotExist() {
assertThat(testee().exist("sid1")).isFalse();
assertThat(testee().exist("sid1").block()).isFalse();
}

@Test
default void existShouldReturnTrueWhenExist() {
String sid = "sid1";
testee().add(sid);
assertThat(testee().exist(sid)).isTrue();
testee().add(sid).block();
assertThat(testee().exist(sid).block()).isTrue();
}

@Test
default void addShouldBeIdempotent() {
String sid = "sid1";
testee().add(sid);
assertThatCode(() -> testee().add(sid)).doesNotThrowAnyException();
assertThat(testee().exist(sid)).isTrue();
testee().add(sid).block();
assertThatCode(() -> testee().add(sid).block()).doesNotThrowAnyException();
assertThat(testee().exist(sid).block()).isTrue();
}

@Test
default void existShouldCheckingAssignKey() {
testee().add("sid2");
assertThat(testee().exist("sid1")).isFalse();
testee().add("sid2").block();
assertThat(testee().exist("sid1").block()).isFalse();
}

class MemoryRevokedTokenRepositoryTest implements RevokedTokenRepositoryContract {
Expand Down