diff --git a/bubble-core/src/main/java/cn/fxbin/bubble/core/util/ReflectUtils.java b/bubble-core/src/main/java/cn/fxbin/bubble/core/util/ReflectUtils.java new file mode 100644 index 00000000..078ce5b2 --- /dev/null +++ b/bubble-core/src/main/java/cn/fxbin/bubble/core/util/ReflectUtils.java @@ -0,0 +1,246 @@ +package cn.fxbin.bubble.core.util; + +import cn.hutool.core.util.ReflectUtil; +import org.springframework.beans.BeansException; +import org.springframework.cglib.core.CodeGenerationException; +import org.springframework.core.convert.Property; +import org.springframework.core.convert.TypeDescriptor; +import org.springframework.lang.Nullable; +import org.springframework.util.ReflectionUtils; + +import java.beans.PropertyDescriptor; +import java.lang.annotation.Annotation; +import java.lang.reflect.Field; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +/** + * ReflectUtils + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 14:29 + */ +public class ReflectUtils extends ReflectionUtils { + + /** + * 获取 Bean 的所有 get方法 + * + * @param type 类 + * @return PropertyDescriptor数组 + */ + public static PropertyDescriptor[] getBeanGetters(Class type) { + return getPropertyDescriptors(type, true, false); + } + + /** + * 获取 Bean 的所有 set方法 + * + * @param type 类 + * @return PropertyDescriptor数组 + */ + public static PropertyDescriptor[] getBeanSetters(Class type) { + return getPropertyDescriptors(type, false, true); + } + + /** + * 获取 Bean 的所有 PropertyDescriptor + * + * @param type 类 + * @param read 读取方法 + * @param write 写方法 + * @return PropertyDescriptor数组 + */ + public static PropertyDescriptor[] getPropertyDescriptors(Class type, boolean read, boolean write) { + try { + PropertyDescriptor[] all = BeanUtils.getPropertyDescriptors(type); + if (read && write) { + return all; + } else { + List properties = new ArrayList<>(all.length); + for (PropertyDescriptor pd : all) { + if (read && pd.getReadMethod() != null) { + properties.add(pd); + } else if (write && pd.getWriteMethod() != null) { + properties.add(pd); + } + } + return properties.toArray(new PropertyDescriptor[0]); + } + } catch (BeansException ex) { + throw new CodeGenerationException(ex); + } + } + + /** + * 获取 bean 的属性信息 + * + * @param propertyType 类型 + * @param propertyName 属性名 + * @return {Property} + */ + @Nullable + public static Property getProperty(Class propertyType, String propertyName) { + PropertyDescriptor propertyDescriptor = BeanUtils.getPropertyDescriptor(propertyType, propertyName); + if (propertyDescriptor == null) { + return null; + } + return ReflectUtils.getProperty(propertyType, propertyDescriptor, propertyName); + } + + /** + * 获取 bean 的属性信息 + * + * @param propertyType 类型 + * @param propertyDescriptor PropertyDescriptor + * @param propertyName 属性名 + * @return {Property} + */ + public static Property getProperty(Class propertyType, PropertyDescriptor propertyDescriptor, String propertyName) { + Method readMethod = propertyDescriptor.getReadMethod(); + Method writeMethod = propertyDescriptor.getWriteMethod(); + return new Property(propertyType, readMethod, writeMethod, propertyName); + } + + /** + * 获取 bean 的属性信息 + * + * @param propertyType 类型 + * @param propertyName 属性名 + * @return {Property} + */ + @Nullable + public static TypeDescriptor getTypeDescriptor(Class propertyType, String propertyName) { + Property property = ReflectUtils.getProperty(propertyType, propertyName); + if (property == null) { + return null; + } + return new TypeDescriptor(property); + } + + /** + * 获取 类属性信息 + * + * @param propertyType 类型 + * @param propertyDescriptor PropertyDescriptor + * @param propertyName 属性名 + * @return {Property} + */ + public static TypeDescriptor getTypeDescriptor(Class propertyType, PropertyDescriptor propertyDescriptor, String propertyName) { + Method readMethod = propertyDescriptor.getReadMethod(); + Method writeMethod = propertyDescriptor.getWriteMethod(); + Property property = new Property(propertyType, readMethod, writeMethod, propertyName); + return new TypeDescriptor(property); + } + + /** + * 获取 类属性 + * + * @param clazz 类信息 + * @param fieldName 属性名 + * @return Field + */ + @Nullable + public static Field getField(Class clazz, String fieldName) { + while (clazz != Object.class) { + try { + return clazz.getDeclaredField(fieldName); + } catch (NoSuchFieldException e) { + clazz = clazz.getSuperclass(); + } + } + return null; + } + + /** + * 获取 所有 field 属性上的注解 + * + * @param clazz 类 + * @param fieldName 属性名 + * @param annotationClass 注解 + * @param 注解泛型 + * @return 注解 + */ + @Nullable + public static T getAnnotation(Class clazz, String fieldName, Class annotationClass) { + Field field = ReflectUtil.getField(clazz, fieldName); + if (field == null) { + return null; + } + return field.getAnnotation(annotationClass); + } + + + /** + * 重写 setField 的方法,用于处理 setAccessible 的问题 + * + * @param field Field + * @param target Object + * @param value value + */ + public static void setField(Field field, @Nullable Object target, @Nullable Object value) { + makeAccessible(field); + ReflectionUtils.setField(field, target, value); + } + + /** + * 重写 setField 的方法,用于处理 setAccessible 的问题 + * + * @param field Field + * @param target Object + * @return value + */ + @Nullable + public static Object getField(Field field, @Nullable Object target) { + makeAccessible(field); + return ReflectionUtils.getField(field, target); + } + + /** + * 重写 setField 的方法,用于处理 setAccessible 的问题 + * + * @param fieldName Field name + * @param target Object + * @return value + */ + @Nullable + public static Object getField(String fieldName, @Nullable Object target) { + if (target == null) { + return null; + } + Class targetClass = target.getClass(); + Field field = getField(targetClass, fieldName); + if (field == null) { + throw new IllegalArgumentException(fieldName + " not in" + targetClass); + } + return getField(field, target); + } + + /** + * 重写 invokeMethod 的方法,用于处理 setAccessible 的问题 + * + * @param method Method + * @param target Object + * @return value + */ + @Nullable + public static Object invokeMethod(Method method, @Nullable Object target) { + return ReflectUtils.invokeMethod(method, target, new Object[0]); + } + + /** + * 重写 invokeMethod 的方法,用于处理 setAccessible 的问题 + * + * @param method Method + * @param target Object + * @param args args + * @return value + */ + @Nullable + public static Object invokeMethod(Method method, @Nullable Object target, @Nullable Object... args) { + makeAccessible(method); + return ReflectionUtils.invokeMethod(method, target, args); + } + +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/RStreamOperations.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/RStreamOperations.java new file mode 100644 index 00000000..223f8e75 --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/RStreamOperations.java @@ -0,0 +1,281 @@ +package cn.fxbin.bubble.data.redis; + +import lombok.RequiredArgsConstructor; +import org.springframework.data.redis.connection.RedisStreamCommands; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.connection.stream.ObjectRecord; +import org.springframework.data.redis.connection.stream.Record; +import org.springframework.data.redis.connection.stream.RecordId; +import org.springframework.data.redis.core.RedisCallback; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StreamOperations; +import org.springframework.data.redis.core.convert.RedisCustomConversions; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.serializer.StringRedisSerializer; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.function.Function; + +/** + * RStreamOperations + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 14:02 + */ +@RequiredArgsConstructor +public class RStreamOperations { + + private final RedisTemplate redisTemplate; + + private final StreamOperations streamOperations; + private static final RedisCustomConversions CUSTOM_CONVERSIONS = new RedisCustomConversions(); + + public static final String OBJECT_PAYLOAD_KEY = "@payload"; + + public RStreamOperations(RedisTemplate redisTemplate) { + this.redisTemplate = redisTemplate; + this.streamOperations = redisTemplate.opsForStream(); + } + + /** + * 发布消息 + * + * @param name 名称 + * @param value 值 + * @return {@link RecordId} + */ + public RecordId send(String name, Object value) { + return this.send(ObjectRecord.create(name, value)); + } + + /** + * 发布消息 + * + * @param name 名称 + * @param key 消息key + * @param value 值 + * @return {@link RecordId} + */ + public RecordId send(String name, String key, Object value) { + return this.send(name, Collections.singletonMap(key, value)); + } + + /** + * 发送消息 + * + * @param name 名称 + * @param key 消息key + * @param data 数据 + * @param mapper 映射器 + * @return {@link RecordId} + */ + public RecordId send(String name, String key, T data, Function mapper) { + return this.send(name, key, mapper.apply(data)); + } + + /** + * 发送消息 + * + * @param name 名称 + * @param messages 消息 + * @return {@link RecordId} + */ + public RecordId send(String name, Map messages) { + return this.send(MapRecord.create(name, messages)); + } + + /** + * 发布消息 + * + * @param record 记录 + * @return {@link RecordId} + */ + public RecordId send(Record record) { + // 1. MapRecord + if (record instanceof MapRecord) { + return streamOperations.add(record); + } + String stream = Objects.requireNonNull(record.getStream(), "RStreamTemplate send stream name is null."); + Object recordValue = Objects.requireNonNull(record.getValue(), () -> "RStreamTemplate send stream: " + stream + " value is null."); + Class valueClass = recordValue.getClass(); + // 2. 普通类型的 ObjectRecord + if (CUSTOM_CONVERSIONS.isSimpleType(valueClass)) { + return streamOperations.add(record); + } + // 3. 自定义类型处理 + Map payload = new HashMap<>(); + + // 自定义 pojo 类型 key + payload.put(OBJECT_PAYLOAD_KEY, recordValue); + MapRecord mapRecord = MapRecord.create(stream, payload); + return streamOperations.add(mapRecord); + } + + /** + * 发布消息 + * + * @param name 名称 + * @param key 消息key + * @param data 数据 + * @return {@link RecordId} + */ + public RecordId send(String name, String key, byte[] data) { + return this.send(name, key, data, RedisStreamCommands.XAddOptions.none()); + } + + /** + * 邮寄 + * + * @param name 名称 + * @param key 消息key + * @param data 数据 + * @param maxLen 限制 stream 最大长度 + * @return {@link RecordId} + */ + public RecordId send(String name, String key, byte[] data, long maxLen) { + return this.send(name, key, data, RedisStreamCommands.XAddOptions.maxlen(maxLen)); + } + + /** + * 邮寄 + * + * @param name 名称 + * @param key 消息key + * @param data 数据 + * @param mapper mapper + * @param maxLen 限制 stream 最大长度 + * @return {@link RecordId} + */ + public RecordId send(String name, String key, T data, Function mapper, long maxLen) { + return send(name, key, mapper.apply(data), maxLen); + } + + /** + * 邮寄 + * + * @param name 名称 + * @param key 消息key + * @param data 数据 + * @param mapper mapper + * @param options {@link RedisStreamCommands.XAddOptions} + * @return {@link RecordId} + */ + public RecordId send(String name, String key, T data, Function mapper, RedisStreamCommands.XAddOptions options) { + return this.send(name, key, mapper.apply(data), options); + } + + /** + * 邮寄 + * + * @param name 名称 + * @param key 钥匙 + * @param data 数据 + * @param options {@link RedisStreamCommands.XAddOptions} + * @return {@link RecordId} + */ + public RecordId send(String name, String key, byte[] data, RedisStreamCommands.XAddOptions options) { + RedisSerializer stringSerializer = StringRedisSerializer.UTF_8; + byte[] nameBytes = Objects.requireNonNull(stringSerializer.serialize(name), "redis stream name is null."); + byte[] keyBytes = Objects.requireNonNull(stringSerializer.serialize(key), "redis stream key is null."); + Map mapDate = Collections.singletonMap(keyBytes, data); + return (RecordId) redisTemplate.execute((RedisCallback) redis -> { + RedisStreamCommands streamCommands = redis.streamCommands(); + return streamCommands.xAdd(MapRecord.create(nameBytes, mapDate), options); + }); + } + + /** + * 删除消息 + * + * @param name 名称 + * @param recordIds 记录ID + * @return {@link Long} + */ + public Long delete(String name, String... recordIds) { + return streamOperations.delete(name, recordIds); + } + + /** + * 删除消息 + * + * @param name 名称 + * @param recordIds 记录ID + * @return {@link Long} + */ + public Long delete(String name, RecordId... recordIds) { + return streamOperations.delete(name, recordIds); + } + + /** + * 删除消息 + * + * @param record 记录 + * @return {@link Long} + */ + public Long delete(Record record) { + return streamOperations.delete(record.getStream(), record.getId()); + } + + /** + * 对流进行修剪,限制长度 + * + * @param name 名称 + * @param count 计数 + * @return {@link Long} + */ + public Long trim(String name, long count) { + return trim(name, count, false); + } + + /** + * 对流进行修剪,限制长度 + * + * @param name 名称 + * @param count 计数 + * @param approximateTrimming 近似修整 + * @return {@link Long} + */ + public Long trim(String name, long count, boolean approximateTrimming) { + return streamOperations.trim(name, count, approximateTrimming); + } + + /** + * 手动 ack + * + * @param name 名称 + * @param group 组 + * @param recordIds 记录ID + * @return {@link Long} + */ + public Long acknowledge(String name, String group, String... recordIds) { + return streamOperations.acknowledge(name, group, recordIds); + } + + /** + * 手动 ack + * + * @param name 名称 + * @param group 组 + * @param recordIds 记录ID + * @return {@link Long} + */ + public Long acknowledge(String name, String group, RecordId... recordIds) { + return streamOperations.acknowledge(name, group, recordIds); + } + + /** + * 手动 ack + * + * @param group 组 + * @param record 记录 + * @return {@link Long} + */ + public Long acknowledge(String group, Record record) { + return streamOperations.acknowledge(group, record); + } + +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/autoconfigure/BubbleRedisProperties.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/autoconfigure/BubbleRedisProperties.java new file mode 100644 index 00000000..8f8f1331 --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/autoconfigure/BubbleRedisProperties.java @@ -0,0 +1,60 @@ +package cn.fxbin.bubble.data.redis.autoconfigure; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.cloud.context.config.annotation.RefreshScope; + +import java.time.Duration; + +import static cn.fxbin.bubble.data.redis.autoconfigure.BubbleRedisProperties.PREFIX; + +/** + * DmRedisProperties + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 14:37 + */ +@Data +@RefreshScope +@ConfigurationProperties(prefix = PREFIX) +public class BubbleRedisProperties { + + public static final String PREFIX = "bubble.data.redis"; + + /** + * stream + */ + private Stream stream = new Stream(); + + + @Data + public static class Stream { + public static final String PREFIX = BubbleRedisProperties.PREFIX + ".stream"; + + /** + * 是否开启 stream + */ + boolean enabled = false; + + /** + * consumer group,默认:服务名 + 环境 + */ + String consumerGroup; + + /** + * 消费者名称,默认:ip + 端口 + */ + String consumerName; + + /** + * poll 批量大小 + */ + Integer pollBatchSize; + + /** + * poll 超时时间 + */ + Duration pollTimeout; + } +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/autoconfigure/RedisStreamAutoConfiguration.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/autoconfigure/RedisStreamAutoConfiguration.java new file mode 100644 index 00000000..2fa75aa2 --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/autoconfigure/RedisStreamAutoConfiguration.java @@ -0,0 +1,118 @@ +package cn.fxbin.bubble.data.redis.autoconfigure; + +import cn.fxbin.bubble.core.util.StringUtils; +import cn.hutool.core.net.NetUtil; +import cn.hutool.core.text.CharPool; +import com.datamesh.boot.data.redis.RStreamOperations; +import com.datamesh.boot.data.redis.stream.RStreamListenerDetector; +import org.springframework.beans.factory.ObjectProvider; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.boot.autoconfigure.AutoConfigureAfter; +import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.autoconfigure.web.ServerProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.core.env.Environment; +import org.springframework.data.redis.connection.RedisConnectionFactory; +import org.springframework.data.redis.connection.stream.MapRecord; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.serializer.RedisSerializer; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.util.ErrorHandler; + +import java.time.Duration; + +/** + * RedisStreamAutoConfiguration + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 14:35 + */ +@Configuration( + proxyBeanMethods = false +) +@ConditionalOnProperty(prefix = BubbleRedisProperties.Stream.PREFIX, name = "enabled", havingValue = "true") +@EnableConfigurationProperties(BubbleRedisProperties.class) +@AutoConfigureAfter(RedisTemplateAutoConfiguration.class) +public class RedisStreamAutoConfiguration { + + /** + * The name of the SpringBoot Application Name + */ + private static final String APPLICATION_NAME = "spring.application.name"; + + /** + * the name of the springboot profiles active + */ + private static final String SPRING_ACTIVE_PROFILES = "spring.profiles.active"; + + @Bean + @ConditionalOnMissingBean + public StreamMessageListenerContainer.StreamMessageListenerContainerOptions> streamMessageListenerContainerOptions(BubbleRedisProperties properties, + ObjectProvider errorHandlerObjectProvider) { + StreamMessageListenerContainer.StreamMessageListenerContainerOptionsBuilder> builder = StreamMessageListenerContainer.StreamMessageListenerContainerOptions + .builder() + .keySerializer(RedisSerializer.string()) + .hashKeySerializer(RedisSerializer.string()) + .hashValueSerializer(RedisSerializer.byteArray()); + BubbleRedisProperties.Stream streamProperties = properties.getStream(); + // 批量大小 + Integer pollBatchSize = streamProperties.getPollBatchSize(); + if (pollBatchSize != null && pollBatchSize > 0) { + builder.batchSize(pollBatchSize); + } + // poll 超时时间 + Duration pollTimeout = streamProperties.getPollTimeout(); + if (pollTimeout != null && !pollTimeout.isNegative()) { + builder.pollTimeout(pollTimeout); + } + // errorHandler + errorHandlerObjectProvider.ifAvailable((builder::errorHandler)); + // TODO L.cm executor + return builder.build(); + } + + @Bean + @ConditionalOnMissingBean + public StreamMessageListenerContainer> streamMessageListenerContainer(RedisConnectionFactory redisConnectionFactory, + StreamMessageListenerContainer.StreamMessageListenerContainerOptions> streamMessageListenerContainerOptions) { + // 根据配置对象创建监听容器 + return StreamMessageListenerContainer.create(redisConnectionFactory, streamMessageListenerContainerOptions); + } + + @Bean + @ConditionalOnMissingBean + public RStreamListenerDetector streamListenerDetector(StreamMessageListenerContainer> streamMessageListenerContainer, + RedisTemplate redisTemplate, + ObjectProvider serverPropertiesObjectProvider, + BubbleRedisProperties properties, + Environment environment) { + BubbleRedisProperties.Stream streamProperties = properties.getStream(); + // 消费组名称 + String consumerGroup = streamProperties.getConsumerGroup(); + if (StringUtils.isBlank(consumerGroup)) { + String appName = environment.getRequiredProperty(APPLICATION_NAME); + String profile = environment.getProperty(SPRING_ACTIVE_PROFILES); + consumerGroup = StringUtils.isBlank(profile) ? appName : appName + CharPool.COLON + profile; + } + // 消费者名称 + String consumerName = streamProperties.getConsumerName(); + if (StringUtils.isBlank(consumerName)) { + final StringBuilder consumerNameBuilder = new StringBuilder(NetUtil.getLocalhostStr()); + serverPropertiesObjectProvider.ifAvailable(serverProperties -> { + consumerNameBuilder.append(CharPool.COLON).append(serverProperties.getPort()); + }); + consumerName = consumerNameBuilder.toString(); + } + return new RStreamListenerDetector(streamMessageListenerContainer, redisTemplate, consumerGroup, consumerName); + } + + @Bean + public RStreamOperations streamTemplate(@Qualifier("bfRedisTemplate") RedisTemplate dmRedisTemplate) { + return new RStreamOperations(dmRedisTemplate); + } + +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/MessageType.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/MessageType.java new file mode 100644 index 00000000..9a3595cd --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/MessageType.java @@ -0,0 +1,24 @@ +package cn.fxbin.bubble.data.redis.stream; + +/** + * MessageType + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 11:40 + */ +public enum MessageType { + + + /** + * 广播 + */ + broadcast, + + /** + * 集群 + */ + cluster; + + +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/RStreamListener.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/RStreamListener.java new file mode 100644 index 00000000..ee52b8cd --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/RStreamListener.java @@ -0,0 +1,66 @@ +package cn.fxbin.bubble.data.redis.stream; + +import java.lang.annotation.*; + +/** + * RStreamListener + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 11:41 + */ +@Documented +@Inherited +@Target(ElementType.METHOD) +@Retention(RetentionPolicy.RUNTIME) +public @interface RStreamListener { + + /** + * Queue name + * + * @return String + */ + String name(); + + /** + * consumer group,默认为服务名 + 环境 + * + * @return String + */ + String group() default ""; + + /** + * 消息方式,集群模式和广播模式,如果想让所有订阅者收到所有消息,广播是一个不错的选择。 + * + * @return MessageType + */ + MessageType messageType() default MessageType.cluster; + + /** + * readOffsetType,默认:LAST_CONSUMED + * + *

+ * 0-0 : 从开始的地方读。 + * $ :表示从尾部开始消费,只接受新消息,当前 Stream 消息会全部忽略。 + * > : 读取所有新到达的元素,这些元素的id大于消费组使用的最后一个元素。 + *

+ * + * @return ReadOffsetModel + */ + ReadOffsetType offsetType() default ReadOffsetType.lastConsumed; + + /** + * 自动 ack + * + * @return boolean + */ + boolean autoAcknowledge() default false; + + /** + * 读取原始的 bytes 数据 + * + * @return boolean + */ + boolean readRawBytes() default false; + +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/RStreamListenerDetector.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/RStreamListenerDetector.java new file mode 100644 index 00000000..926cc1ba --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/RStreamListenerDetector.java @@ -0,0 +1,143 @@ +package cn.fxbin.bubble.data.redis.stream; + +import cn.fxbin.bubble.core.util.ClassUtils; +import cn.fxbin.bubble.core.util.ReflectUtils; +import cn.fxbin.bubble.core.util.StringUtils; +import cn.fxbin.bubble.data.redis.RStreamOperations; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.config.BeanPostProcessor; +import org.springframework.core.annotation.AnnotationUtils; +import org.springframework.data.redis.RedisSystemException; +import org.springframework.data.redis.connection.stream.*; +import org.springframework.data.redis.core.RedisTemplate; +import org.springframework.data.redis.core.StreamOperations; +import org.springframework.data.redis.stream.StreamMessageListenerContainer; +import org.springframework.util.Assert; +import org.springframework.util.ReflectionUtils; + +import java.lang.reflect.Method; +import java.util.Collections; +import java.util.Map; + +/** + * RStreamListenerDetector + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 11:46 + */ +@Slf4j +@RequiredArgsConstructor +public class RStreamListenerDetector implements BeanPostProcessor, InitializingBean { + + private final StreamMessageListenerContainer> streamMessageListenerContainer; + + private final RedisTemplate redisTemplate; + + private final String consumerGroup; + + private final String consumerName; + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + Class userClass = ClassUtils.getUserClass(bean); + ReflectionUtils.doWithMethods(userClass, method -> { + RStreamListener listener = AnnotationUtils.findAnnotation(method, RStreamListener.class); + if (listener != null) { + String streamKey = listener.name(); + Assert.hasText(streamKey, "@RStreamListener name must not be empty."); + log.info("Found @RStreamListener on bean:{} method:{}", beanName, method); + // 校验 method,method 入参数大于等于1 + int paramCount = method.getParameterCount(); + if (paramCount > 1) { + throw new IllegalArgumentException("@RStreamListener on method " + method + " parameter count must less or equal to 1."); + } + // streamOffset + ReadOffset readOffset = listener.offsetType().getReadOffset(); + StreamOffset streamOffset = StreamOffset.create(streamKey, readOffset); + // 消费模式 + MessageType messageType = listener.messageType(); + if (MessageType.broadcast == messageType) { + broadCast(streamOffset, bean, method, listener.readRawBytes()); + } else { + String groupId = StringUtils.isNotBlank(listener.group()) ? listener.group() : consumerGroup; + Consumer consumer = Consumer.from(groupId, consumerName); + // 如果需要,创建 group + createGroupIfNeed(redisTemplate, streamKey, readOffset, groupId); + cluster(consumer, streamOffset, listener, bean, method); + } + } + }, ReflectionUtils.USER_DECLARED_METHODS); + return bean; + } + + private void broadCast(StreamOffset streamOffset, Object bean, Method method, boolean isReadRawBytes) { + streamMessageListenerContainer.receive(streamOffset, (message) -> { + // MapBackedRecord + invokeMethod(bean, method, message, isReadRawBytes); + }); + } + + private void cluster(Consumer consumer, StreamOffset streamOffset, RStreamListener listener, Object bean, Method method) { + boolean autoAcknowledge = listener.autoAcknowledge(); + StreamMessageListenerContainer.ConsumerStreamReadRequest readRequest = StreamMessageListenerContainer.StreamReadRequest.builder(streamOffset).consumer(consumer).autoAcknowledge(autoAcknowledge).build(); + StreamOperations opsForStream = redisTemplate.opsForStream(); + streamMessageListenerContainer.register(readRequest, (message) -> { + // MapBackedRecord + invokeMethod(bean, method, message, listener.readRawBytes()); + // ack + if (!autoAcknowledge) { + opsForStream.acknowledge(consumer.getGroup(), message); + } + }); + } + + private static void createGroupIfNeed(RedisTemplate redisTemplate, String streamKey, ReadOffset readOffset, String group) { + StreamOperations opsForStream = redisTemplate.opsForStream(); + try { + StreamInfo.XInfoGroups groups = opsForStream.groups(streamKey); + if (groups.stream().noneMatch((x) -> group.equals(x.groupName()))) { + opsForStream.createGroup(streamKey, readOffset, group); + } + } catch (RedisSystemException e) { + // RedisCommandExecutionException: ERR no such key + opsForStream.createGroup(streamKey, group); + } + } + + private void invokeMethod(Object bean, Method method, MapRecord mapRecord, boolean isReadRawBytes) { + // 支持没有参数的方法 + if (method.getParameterCount() == 0) { + ReflectUtils.invokeMethod(method, bean); + return; + } + if (isReadRawBytes) { + ReflectUtils.invokeMethod(method, bean, mapRecord); + } else { + ReflectUtils.invokeMethod(method, bean, getRecordValue(mapRecord)); + } + } + + private Object getRecordValue(MapRecord mapRecord) { + Map messageValue = mapRecord.getValue(); + if (messageValue.containsKey(RStreamOperations.OBJECT_PAYLOAD_KEY)) { + byte[] payloads = messageValue.get(RStreamOperations.OBJECT_PAYLOAD_KEY); + Object deserialize = redisTemplate.getValueSerializer().deserialize(payloads); + return ObjectRecord.create(mapRecord.getStream(), deserialize).withId(mapRecord.getId()); + } else { + return mapRecord.mapEntries(entry -> { + String key = entry.getKey(); + Object value = redisTemplate.getValueSerializer().deserialize(entry.getValue()); + return Collections.singletonMap(key, value).entrySet().iterator().next(); + }); + } + } + + @Override + public void afterPropertiesSet() throws Exception { + streamMessageListenerContainer.start(); + } +} diff --git a/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/ReadOffsetType.java b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/ReadOffsetType.java new file mode 100644 index 00000000..36941c24 --- /dev/null +++ b/bubble-spring-boot-starters/bubble-spring-boot-starter-data-redis/src/main/java/cn/fxbin/bubble/data/redis/stream/ReadOffsetType.java @@ -0,0 +1,38 @@ +package cn.fxbin.bubble.data.redis.stream; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import org.springframework.data.redis.connection.stream.ReadOffset; + +/** + * ReadOffsetType + * + * @author fxbin + * @version v1.0 + * @since 2023/9/11 11:42 + */ +@Getter +@AllArgsConstructor +public enum ReadOffsetType { + + /** + * 从开始的地方读 + */ + start(ReadOffset.from("0-0")), + + /** + * 从最近的偏移量读取。 + */ + latest(ReadOffset.latest()), + + /** + * 读取所有新到达的元素,这些元素的id大于最后一个消费组的id。 + */ + lastConsumed(ReadOffset.lastConsumed()); + + /** + * readOffset + */ + private final ReadOffset readOffset; + +}