diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java index 2ea7310b83..8de0ca1c54 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/Channel.java @@ -31,6 +31,7 @@ import com.lmax.disruptor.dsl.ProducerType; import lombok.Getter; +import lombok.Setter; public class Channel implements LifeCycle { @@ -39,11 +40,16 @@ public class Channel implements LifeCycle { @Getter private DisruptorProvider provider; private final Integer size; - private final EventHandler eventHandler; + @Setter + private EventHandler eventHandler; private volatile boolean started = false; private final TopicMetadata topic; private static final String THREAD_NAME_PREFIX = "standalone_disruptor_provider_"; + public Channel(TopicMetadata topic) { + this(DEFAULT_SIZE, topic, null); + } + public Channel(TopicMetadata topic, EventHandler eventHandler) { this(DEFAULT_SIZE, topic, eventHandler); } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java index 8654b2d1c3..0cda576332 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/main/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBroker.java @@ -60,9 +60,12 @@ public static StandaloneBroker getInstance() { public MessageEntity putMessage(String topicName, CloudEvent message) { TopicMetadata topicMetadata = new TopicMetadata(topicName); if (!messageContainer.containsKey(topicMetadata)) { - createTopic(topicName); + throw new RuntimeException(String.format("The topic:%s is not created", topicName)); } Channel channel = messageContainer.get(topicMetadata); + if (channel.isClosed()) { + throw new RuntimeException(String.format("The topic:%s is not subscribed", topicName)); + } MessageEntity messageEntity = new MessageEntity(new TopicMetadata(topicName), message); channel.getProvider().onData(messageEntity); return messageEntity; @@ -70,15 +73,7 @@ public MessageEntity putMessage(String topicName, CloudEvent message) { public Channel createTopic(String topicName) { TopicMetadata topicMetadata = new TopicMetadata(topicName); - return messageContainer.computeIfAbsent(topicMetadata, k -> { - Subscribe subscribe = subscribeContainer.get(topicMetadata); - if (subscribe == null) { - throw new IllegalStateException("the topic not exist subscribe "); - } - Channel channel = new Channel(topicMetadata, subscribe); - channel.start(); - return channel; - }); + return messageContainer.computeIfAbsent(topicMetadata, k -> new Channel(topicMetadata)); } /** @@ -139,10 +134,17 @@ public void deleteTopicIfExist(String topicName) { public void subscribed(String topicName, Subscribe subscribe) { TopicMetadata topicMetadata = new TopicMetadata(topicName); - if (getMessageContainer().containsKey(topicMetadata)) { - log.warn("the topic already subscribed"); + if (subscribeContainer.containsKey(topicMetadata)) { + log.warn("the topic:{} already subscribed", topicName); + return; + } + Channel channel = getMessageContainer().get(topicMetadata); + if (channel == null) { + log.warn("the topic:{} is not created", topicName); return; } + channel.setEventHandler(subscribe); + channel.start(); subscribeContainer.put(topicMetadata, subscribe); } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java index 0c16aabb35..5571cda950 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/TestUtils.java @@ -93,11 +93,13 @@ public static MessageEntity createMessageEntity(TopicMetadata topicMetadata, Clo } public static Subscribe createSubscribe(StandaloneBroker standaloneBroker) { + standaloneBroker.createTopic(TEST_TOPIC); return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> { }); } public static Subscribe createSubscribe(StandaloneBroker standaloneBroker, List cloudEvents) { + standaloneBroker.createTopic(TEST_TOPIC); return new Subscribe(TEST_TOPIC, standaloneBroker, (cloudEvent, context) -> { cloudEvents.add(cloudEvent); }); diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java index 6d84cb7800..d57ba6523b 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/broker/StandaloneBrokerTest.java @@ -69,13 +69,4 @@ public void testCheckTopicExist() throws InterruptedException { Assertions.assertTrue(exists); } - @Test - public void testDeleteTopicIfExist() throws InterruptedException { - StandaloneBroker instance = getStandaloneBroker(); - CloudEvent cloudEvent = createDefaultCloudEvent(); - instance.putMessage(TEST_TOPIC, cloudEvent); - instance.deleteTopicIfExist(TEST_TOPIC); - boolean exists = instance.checkTopicExist(TEST_TOPIC); - Assertions.assertFalse(exists); - } } diff --git a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java index 4bfee4976f..20db666831 100644 --- a/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java +++ b/eventmesh-storage-plugin/eventmesh-storage-standalone/src/test/java/org/apache/eventmesh/storage/standalone/producer/StandaloneProducerTest.java @@ -18,10 +18,12 @@ package org.apache.eventmesh.storage.standalone.producer; import static org.apache.eventmesh.storage.standalone.TestUtils.TEST_TOPIC; +import static org.apache.eventmesh.storage.standalone.TestUtils.createSubscribe; import org.apache.eventmesh.api.SendResult; import org.apache.eventmesh.storage.standalone.TestUtils; import org.apache.eventmesh.storage.standalone.broker.StandaloneBroker; +import org.apache.eventmesh.storage.standalone.broker.task.Subscribe; import java.util.Properties; @@ -70,6 +72,8 @@ public void testPublish() { StandaloneBroker standaloneBroker = StandaloneBroker.getInstance(); standaloneBroker.createTopicIfAbsent(TEST_TOPIC); CloudEvent cloudEvent = TestUtils.createDefaultCloudEvent(); + Subscribe subscribe = createSubscribe(standaloneBroker); + subscribe.subscribe(); SendResult sendResult = standaloneProducer.publish(cloudEvent); Assertions.assertNotNull(sendResult); }