Skip to content

Commit

Permalink
[ISSUE #5127] fix create topic error in Standalone mode (#5128)
Browse files Browse the repository at this point in the history
* [ISSUE #5127] fix

* [ISSUE #5127] fix

* [ISSUE #5127] fix

* [ISSUE #5127] fix

* [ISSUE #5127] fix checkstyle test

---------

Co-authored-by: JiangShuJu <[email protected]>
  • Loading branch information
jevinjiang and JiangShuJu authored Dec 5, 2024
1 parent dd9698f commit 831fd72
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.lmax.disruptor.dsl.ProducerType;

import lombok.Getter;
import lombok.Setter;


public class Channel implements LifeCycle {
Expand All @@ -39,11 +40,16 @@ public class Channel implements LifeCycle {
@Getter
private DisruptorProvider provider;
private final Integer size;
private final EventHandler<MessageEntity> eventHandler;
@Setter
private EventHandler<MessageEntity> eventHandler;
private volatile boolean started = false;
private final TopicMetadata topic;
private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_";

public Channel(TopicMetadata topic) {
this(DEFAULT_SIZE, topic, null);
}

public Channel(TopicMetadata topic, EventHandler<MessageEntity> eventHandler) {
this(DEFAULT_SIZE, topic, eventHandler);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,25 +60,20 @@ public static StandaloneBroker getInstance() {
public MessageEntity putMessage(String topicName, CloudEvent message) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (!messageContainer.containsKey(topicMetadata)) {
createTopic(topicName);
throw new RuntimeException(String.format("The topic:%s is not created", topicName));
}
Channel channel = messageContainer.get(topicMetadata);
if (channel.isClosed()) {
throw new RuntimeException(String.format("The topic:%s is not subscribed", topicName));
}
MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message);
channel.getProvider().onData(messageEntity);
return messageEntity;
}

public Channel createTopic(String topicName) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
return messageContainer.computeIfAbsent(topicMetadata, k -> {
Subscribe subscribe = subscribeContainer.get(topicMetadata);
if (subscribe == null) {
throw new IllegalStateException("the topic not exist subscribe ");
}
Channel channel = new Channel(topicMetadata, subscribe);
channel.start();
return channel;
});
return messageContainer.computeIfAbsent(topicMetadata, k -> new Channel(topicMetadata));
}

/**
Expand Down Expand Up @@ -139,10 +134,17 @@ public void deleteTopicIfExist(String topicName) {

public void subscribed(String topicName, Subscribe subscribe) {
TopicMetadata topicMetadata = new TopicMetadata(topicName);
if (getMessageContainer().containsKey(topicMetadata)) {
log.warn("the topic already subscribed");
if (subscribeContainer.containsKey(topicMetadata)) {
log.warn("the topic:{} already subscribed", topicName);
return;
}
Channel channel = getMessageContainer().get(topicMetadata);
if (channel == null) {
log.warn("the topic:{} is not created", topicName);
return;
}
channel.setEventHandler(subscribe);
channel.start();
subscribeContainer.put(topicMetadata, subscribe);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,13 @@ public static MessageEntity createMessageEntity(TopicMetadata topicMetadata, Clo
}

public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) {
standaloneBroker.createTopic(TEST_TOPIC);
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> {
});
}

public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, List<CloudEvent> cloudEvents) {
standaloneBroker.createTopic(TEST_TOPIC);
return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> {
cloudEvents.add(cloudEvent);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,13 +69,4 @@ public void testCheckTopicExist() throws InterruptedException {
Assertions.assertTrue(exists);
}

@Test
public void testDeleteTopicIfExist() throws InterruptedException {
StandaloneBroker instance = getStandaloneBroker();
CloudEvent cloudEvent = createDefaultCloudEvent();
instance.putMessage(TEST_TOPIC, cloudEvent);
instance.deleteTopicIfExist(TEST_TOPIC);
boolean exists = instance.checkTopicExist(TEST_TOPIC);
Assertions.assertFalse(exists);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@
package org.apache.eventmesh.storage.standalone.producer;

import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC;
import static org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe;

import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.storage.standalone.TestUtils;
import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.storage.standalone.broker.task.Subscribe;

import java.util.Properties;

Expand Down Expand Up @@ -70,6 +72,8 @@ public void testPublish() {
StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
standaloneBroker.createTopicIfAbsent(TEST_TOPIC);
CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent();
Subscribe subscribe = createSubscribe(standaloneBroker);
subscribe.subscribe();
SendResult sendResult = standaloneProducer.publish(cloudEvent);
Assertions.assertNotNull(sendResult);
}
Expand Down

0 comments on commit 831fd72

Please sign in to comment.