进阶指南
本章节展示 spring-cloud-statrer-stream-rocketmq 的实现和相关配置。
Spring Cloud Stream RocketMQ Binder 的实现
架构实现
spring-cloud-statrer-stream-rocketmq 去除了对 RocketMQ-Spring 框架的依赖 。 Spring Cloud Stream Binder 核心类 RocketMQMessageChannelBinder 实现了 Spring Cloud Stream 规范,内部会构建 RocketMQInboundChannelAdapter 和 RocketMQProducerMessageHandler。
RocketMQProducerMessageHandler 会基于 Binding 配置通过 RocketMQProduceFactory 构造 RocketMQ Producer,其内部会把 spring-messaging 模块内 org.springframework.messaging.Message 消息类转换成 RocketMQ 的消息类 org.apache.rocketmq.common.message.Message,然后发送出去。
RocketMQInboundChannelAdapter 也会基于 Binding 配置通过 RocketMQConsumerFactory 构造 DefaultMQPushConsumer,其内部会启动 RocketMQ Consumer 接收消息。
NOTE 与 RocketMQ-Spring 框架的兼容需要手动处理。
目前 Binder 支持在 Header 中设置相关的 key 来进行 RocketMQ Message 消息的特性设置。
比如 TAGS、KEYS、TRANSACTIONAL_ARGS 等 RocketMQ 消息对应的标签,详情见 com.alibaba.cloud.stream.binder.rocketmq.constant.RocketMQConst
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key");
Message message = builder.build();
output().send(message);
或者使用 StreamBridge:
MessageBuilder builder = MessageBuilder.withPayload(msg)
.setHeader(RocketMQHeaders.TAGS, "binder")
.setHeader(RocketMQHeaders.KEYS, "my-key");
Message message = builder.build();
streamBridge.send("producer-out-0", message);
NOTE 更多使用请参考样例:com.alibaba.cloud.examples.SenderService。
更多配置项参考
绑定器配置
关于以 spring-cloud-starter-stream-rocketmq-binder 为前缀的配置项如下所示:
配置项 | key | 默认值 | 说明 |
---|---|---|---|
RocketMQ NameServer 地址 | spring.cloud.stream.rocketmq.binder.name-server | 127.0.0.1:9876 | 老版本使用 namesrv-addr 配置项 |
身份验证公钥 | spring.cloud.stream.rocketmq.binder.access-key | null | 阿里云账号 AccessKey |
身份验证私钥 | spring.cloud.stream.rocketmq.binder.secret-key | null | 阿里云账号 SecretKey |
消息轨迹功能 | spring.cloud.stream.rocketmq.binder.enable-msg-trace | true | 是否为 Producer 和 Consumer 开启消息轨迹功能 |
topic 名称 | spring.cloud.stream.rocketmq.binder.customized-trace-topic | RMQ_SYS_TRACE_TOPIC | 消息轨迹开启后存储的 topic 名称 |
消息消费者配置
关于以 spring-cloud-starter-stream-rocketmq-binder-consumer 为前缀的配置项如下所示:
配置项 | key | 默认值 | 说明 |
---|---|---|---|
是否启用 Consumer | spring.cloud.starter.stream.rocketmq.binder. consumer.enable | true | |
Consumer 基于 TAGS 订阅 | spring.cloud.starter.stream.rocketmq.binder. consumer.subscription | empty | 多个 tag 以 || 分割。更多见 subscription。 |
Consumer 消费模式 | spring.cloud.starter.stream.rocketmq.binder. consumer.messageModel | CLUSTERING | 如果想让每一个的订阅者都能接收到消息,可以使用广播模式。更多见 MessageModel |
Consumer 从哪里开始消费 | spring.cloud.starter.stream.rocketmq.binder. consumer.consumeFromWhere | 更多见 ConsumeFromWhere。 |
NOTE 更多见 RocketMQConsumerProperties。
关于以 spring-cloud-starter-stream-rocketmq-binder-consumer-push 为前缀的配置项如下所示:
配置项 | key | 默认值 | 说明 |
---|---|---|---|
是否同步消费消息模式 | spring.cloud.starter.stream.rocketmq.binder. consumer.push.pushorderly | false | |
消费失败重试策略 | spring.cloud.starter.stream.rocketmq.binder. consumer.push.delayLevelWhenNextConsume | 0 | 同步消费消息模式下。-1,不重复直接放入死信队列。0,broker 控制重试策略。0,client 控制重试策略。 |
消费失败后再次消费的时间间隔 | spring.cloud.starter.stream.rocketmq.binder. consumer.push.suspendCurrentQueueTimeMillis | 1000 | 同步消费消息模式下。 |
NOTE 其他更多参数见 RocketMQConsumerProperties.Push
关于以 spring-cloud-starter-stream-rocketmq-binder-consumer-pull 为前缀的配置项如下所示:
配置项 | key | 默认值 | 说明 |
---|---|---|---|
消费时拉取的线程数 | spring.cloud.starter.stream.rocketmq.binder.consumer.pull.pullThreadNums | 20 | |
拉取时的超时毫秒数 | spring.cloud.starter.stream.rocketmq.binder.consumer.push.pollTimeoutMillis | 1000 * 5 |
NOTE 其他更多参数见 RocketMQConsumerProperties.Pull。
消息生产者配置
关于以 spring-cloud-starter-stream-rocketmq-binder-producer 为前缀的配置项如下所示:
配置项 | key | 默认值 | 说明 |
---|---|---|---|
是否启用 Producer | spring.cloud.starter.stream.rocketmq. binder.producer.enable | true | |
生产者集群名称 | spring.cloud.starter.stream.rocketmq. binder.producer.group | empty | |
消息发送的最大字节数 | spring.cloud.starter.stream.rocketmq. binder.producer.maxMessageSize | 8249344 | |
消息生产者类型 | spring.cloud.starter.stream.rocketmq. binder.producer.producerType | Normal | 普通或者事务。更多见 RocketMQProducerProperties.ProducerType |
事务消息监听器的 beanName | spring.cloud.starter.stream.rocketmq. binder.producer.transactionListener | 在 producerType=Trans 时才有效;必须是实现 TransactionListener 接口的 Spring Bean | |
消息发送类型 | spring.cloud.starter.stream.rocketmq. binder.producer.sendType | Sync | 同步、异步、单向。更多见RocketMQProducerProperties.SendType |
消息发送后回调函数的 beanName | spring.cloud.starter.stream.rocketmq. binder.producer.sendCallBack | 在 sendType=Async 时才有效;必须是实现 SendCallback 接口的 Spring Bean | |
是否在 Vip Channel 上发送消息 | spring.cloud.starter.stream.rocketmq. binder.producer.vipChannelEnabled | true | |
发送消息的超时时间 | spring.cloud.starter.stream.rocketmq. binder.producer.sendMessageTimeout | 3000 | 单位为毫秒 |
消息体压缩阀值 | spring.cloud.starter.stream.rocketmq. binder.producer.compressMessageBodyThreshold | ||
在同步发送消息的模式下,消息发送失败的重试次数 | spring.cloud.starter.stream.rocketmq. binder.producer.retryTimesWhenSendFailed | 2 | |
在异步发送消息的模式下,消息发送失败的重试次数 | spring.cloud.starter.stream.rocketmq. binder.producer.retryTimesWhenSendAsyncFailed | 2 | |
消息发送失败的情况下是否重试其它的 broker | spring.cloud.starter.stream.rocketmq. binder.producer.retryAnotherBroker | false |
NOTE 生产者其他更多参数请见:RocketMQProducerProperties。