RocketMQ 怎么实现消息分发的?
RocketMQ的消息分发机制是其核心设计之一,我将从架构设计、队列模型、负载均衡、消费模式和高级特性五个层面,为您深入解析。
一、 RocketMQ消息分发的核心架构
1.1 核心组件角色
生产者(Producer) → 主题(Topic) → 队列(MessageQueue) → 消费者组(ConsumerGroup) ← 消费者(Consumer)
↑
Broker集群
关键组件解析:
- Topic(主题):消息的逻辑分类,生产者发送消息到Topic,消费者订阅Topic
- MessageQueue(消息队列):Topic的物理分区,消息存储和分发的基本单位
- ConsumerGroup(消费者组):一组协同工作的消费者,共同消费一个Topic
- Broker:消息存储和转发服务器,每个Broker包含多个Queue
1.2 数据流架构
// 生产者发送消息到Topic的某个Queue
Message msg = new Message("OrderTopic", "订单创建", orderJson.getBytes());
SendResult result = producer.send(msg);
// 消费者订阅Topic,从分配的Queue拉取消息
consumer.subscribe("OrderTopic", "*"); // *表示消费所有Tag
二、 队列模型:消息分发的基石
2.1 Topic与Queue的关系
// 创建Topic时可以指定Queue数量
TopicConfig topicConfig = new TopicConfig();
topicConfig.setTopicName("OrderTopic");
topicConfig.setWriteQueueNums(8); // 8个写队列
topicConfig.setReadQueueNums(8); // 8个读队列
// 实际存储结构
OrderTopic/
├── Queue0 (Broker-a)
├── Queue1 (Broker-a)
├── Queue2 (Broker-b) // Queue分布在多个Broker
└── Queue3 (Broker-b)
核心特性:
- 队列是并行处理的最小单位:一个Queue只能被一个消费者线程处理
- 队列数量决定最大并发度:8个Queue → 最多8个消费者线程并发消费
- 队列分布实现负载均衡:Queue分布在多个Broker,避免单点瓶颈
2.2 队列选择策略
// 生产者发送时选择Queue的策略
public class OrderProducer {
// 1. 轮询策略(默认):平均分布到所有Queue
SendResult sendRoundRobin(Message msg) {
return producer.send(msg); // 自动轮询选择Queue
}
// 2. 按Key哈希:相同业务Key的消息进入同一Queue
SendResult sendByOrderId(Message msg, String orderId) {
msg.setKeys(orderId); // 设置业务Key
return producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg) {
int index = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(index);
}
}, orderId);
}
// 3. 手动指定Queue(特殊场景)
SendResult sendToSpecificQueue(Message msg, int queueId) {
return producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg) {
return mqs.get(queueId);
}
}, null);
}
}
三、 消费者负载均衡机制
3.1 Rebalance(重平衡)机制
触发条件:
- 消费者启动、停止
- 消费者数量变化
- Topic的Queue数量变化
重平衡过程:
// 伪代码:消费者端的Rebalance逻辑
public void doRebalance() {
// 1. 获取Topic的所有Queue
List<MessageQueue> allQueues = mQClientFactory.getTopicAllQueues("OrderTopic");
// 2. 获取消费者组的所有消费者实例
List<String> allConsumers = getConsumerList("OrderGroup");
// 3. 根据策略分配Queue
Map<String, List<MessageQueue>> allocation =
strategy.allocate("OrderGroup", currentConsumerId, allQueues, allConsumers);
// 4. 更新本地消费的Queue列表
updateProcessQueueTable(allocation.get(currentConsumerId));
}
3.2 负载均衡策略
RocketMQ提供了多种分配策略:
// 1. 平均分配(默认策略)
AllocateMessageQueueAveragely strategy = new AllocateMessageQueueAveragely();
// 示例:8个Queue,3个消费者
// Consumer1: Queue0, Queue1, Queue2
// Consumer2: Queue3, Queue4, Queue5
// Consumer3: Queue6, Queue7
// 2. 平均轮询分配
AllocateMessageQueueAveragelyByCircle strategy =
new AllocateMessageQueueAveragelyByCircle();
// Consumer1: Queue0, Queue3, Queue6
// Consumer2: Queue1, Queue4, Queue7
// Consumer3: Queue2, Queue5
// 3. 根据机房/机器名分配(避免跨机房消费)
AllocateMessageQueueByMachineRoom strategy =
new AllocateMessageQueueByMachineRoom();
// 只分配同机房的Queue
// 4. 一致性哈希分配(减少Rebalance影响)
AllocateMessageQueueConsistentHash strategy =
new AllocateMessageQueueConsistentHash();
// 配置策略
consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragely());
3.3 消费者组与消费模式
// 推模式消费者(常用)
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("OrderGroup");
pushConsumer.subscribe("OrderTopic", "*");
pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
// 处理消息
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 拉模式消费者(需手动控制)
DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("OrderGroup");
pullConsumer.start();
// 手动拉取指定Queue的消息
PullResult pullResult = pullConsumer.pull(
mq, // 指定MessageQueue
"*", // 订阅表达式
nextOffset, // 消费偏移量
32 // 最大拉取数量
);
四、 消息过滤机制
4.1 Tag过滤(一级过滤)
// 生产者:发送时设置Tag
Message msg = new Message("OrderTopic", "PAY_SUCCESS", orderJson.getBytes());
// 消费者:订阅时指定Tag
// 1. 单Tag订阅
consumer.subscribe("OrderTopic", "PAY_SUCCESS");
// 2. 多Tag订阅(使用||分隔)
consumer.subscribe("OrderTopic", "PAY_SUCCESS || PAY_FAILED");
// 3. 订阅所有Tag
consumer.subscribe("OrderTopic", "*");
4.2 SQL92过滤(二级过滤,Broker端)
// 生产者:设置消息属性
Message msg = new Message("OrderTopic", "TAG1", orderJson.getBytes());
msg.putUserProperty("amount", "1000");
msg.putUserProperty("region", "beijing");
// 消费者:使用SQL表达式过滤
consumer.subscribe("OrderTopic",
MessageSelector.bySql("amount > 500 AND region = 'beijing'"));
过滤原理:
1. 生产者发送消息,携带Tag和Properties
2. Broker存储消息时,同时存储过滤信息
3. 消费者拉取时,Broker根据订阅的过滤条件筛选消息
4. 只有符合条件的消息才会发送给消费者
4.3 过滤服务器(复杂过滤场景)
对于复杂的过滤逻辑,可以部署独立的Filter Server:
// Filter Server实现过滤接口
public class CustomFilter implements MessageFilter {
@Override
public boolean match(MessageExt msg, FilterContext context) {
// 自定义过滤逻辑
String body = new String(msg.getBody());
return body.contains("VIP"); // 只消费VIP用户消息
}
}
五、 顺序消息分发
5.1 全局顺序消息(性能受限)
// 创建只有一个Queue的Topic
TopicConfig config = new TopicConfig();
config.setTopicName("GlobalOrderTopic");
config.setWriteQueueNums(1); // 关键:只有1个Queue
config.setReadQueueNums(1);
// 消费者也必须顺序消费
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 保证同一个Queue的消息顺序处理
return ConsumeOrderlyStatus.SUCCESS;
}
});
5.2 分区顺序消息(推荐方案)
// 生产者:相同订单号的消息发送到同一个Queue
Message msg = new Message("OrderTopic", "订单操作", orderJson.getBytes());
SendResult result = producer.send(msg, new MessageQueueSelector() {
@Override
public MessageQueue select(List<MessageQueue> mqs,
Message msg, Object arg) {
String orderId = (String) arg;
int queueId = Math.abs(orderId.hashCode()) % mqs.size();
return mqs.get(queueId);
}
}, orderId); // 相同orderId的消息进入同一个Queue
// 消费者:顺序消费模式
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeOrderlyContext context) {
// 同一个Queue的消息会顺序处理
// 不同Queue的消息可以并行处理
return ConsumeOrderlyStatus.SUCCESS;
}
});
六、 广播模式与集群模式
6.1 集群模式(默认)
// 消费者设置为集群模式
consumer.setMessageModel(MessageModel.CLUSTERING);
特点:
- 同组消费者分摊消费消息(负载均衡)
- 每条消息只被消费一次
- 适合大部分业务场景
6.2 广播模式
// 消费者设置为广播模式
consumer.setMessageModel(MessageModel.BROADCASTING);
特点:
- 同组消费者各自消费全量消息
- 每条消息被每个消费者消费一次
- 适合配置同步、缓存刷新等场景
对比:
| 特性 | 集群模式 | 广播模式 |
|---|---|---|
| 消费方式 | 分摊消费 | 全量消费 |
| 消费次数 | 每条消息被消费一次 | 每个消费者都消费一次 |
| 偏移量存储 | Broker端存储,共享 | 消费者本地存储 |
| 适用场景 | 普通业务消息 | 配置同步、缓存刷新 |
七、 消息轨迹与监控
7.1 消息轨迹追踪
// 启用消息轨迹
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroup", true);
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup", true);
// 轨迹数据存储到Topic
producer.setTraceTopic("RMQ_SYS_TRACE_TOPIC");
7.2 消费进度监控
// 查看消费者组的消费进度
ClusterInfo clusterInfo = mQAdminExt.examineBrokerClusterInfo();
ConsumerConnection consumerConnection = mQAdminExt.examineConsumerConnectionInfo(
"OrderGroup");
// 获取每个Queue的消费偏移量
for (Connection connection : consumerConnection.getConnectionSet()) {
String clientId = connection.getClientId();
ConsumerRunningInfo runningInfo = mQAdminExt.getConsumerRunningInfo(
"OrderGroup", clientId, false);
// 消费偏移量信息
Map<MessageQueue, Long> offsetTable = runningInfo.getOffsetTable();
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
MessageQueue mq = entry.getKey();
long consumerOffset = entry.getValue();
long brokerOffset = mQAdminExt.maxOffset(mq);
long lag = brokerOffset - consumerOffset; // 堆积量
}
}
八、 生产环境最佳实践
8.1 配置建议
// 消费者配置最佳实践
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("OrderGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("OrderTopic", "*");
// 1. 消费线程数配置(根据Queue数量调整)
consumer.setConsumeThreadMin(20);
consumer.setConsumeThreadMax(64);
// 2. 每次拉取消息数量(平衡网络开销和处理速度)
consumer.setPullBatchSize(32);
// 3. 消息重试策略
consumer.setMaxReconsumeTimes(3); // 最大重试次数
// 4. 消费超时时间
consumer.setConsumeTimeout(15); // 分钟
// 5. 开启消费端限流(防止突发流量)
consumer.setPullThresholdForQueue(1000); // 每个Queue最大缓存消息数
8.2 灾备与高可用
// 多集群消费(异地容灾)
consumer.setNamesrvAddr("主集群:9876;备集群:9876");
// 消费进度同步(主备切换时使用)
// 从主集群导出消费进度
Map<MessageQueue, Long> offsetTable = getOffsetFromPrimary();
// 在备集群设置消费进度
for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
consumer.updateConsumeOffset(entry.getKey(), entry.getValue());
}
8.3 动态扩缩容
// 增加Queue数量(需要提前规划)
TopicConfig config = new TopicConfig();
config.setTopicName("OrderTopic");
config.setWriteQueueNums(16); // 从8增加到16
config.setReadQueueNums(16);
// 注意:增加Queue数量会导致Rebalance
// 建议在业务低峰期操作
总结
RocketMQ的消息分发机制体现了分布式系统的经典设计哲学:
- 队列是核心:通过Queue实现消息的物理分区,既是存储单位,也是并行单位
- 消费者组是协调单元:组内消费者协同工作,实现负载均衡
- Rebalance是动态平衡器:自动适应消费者数量和队列变化
- 多种模式适应不同场景:顺序/并发、集群/广播、推/拉模式
作为架构师,设计消息分发时要考虑的关键点:
- 队列数量规划:根据业务吞吐量和消费者数量合理设置Queue数量
- 消费幂等性:消息可能被重复消费(网络重试、Rebalance等场景)
- 消费堆积监控:实时监控lag,避免消息积压
- 过滤策略选择:根据业务需求选择Tag过滤或SQL过滤
- 顺序性保证:需要顺序消费时,确保相同业务Key的消息进入同一Queue
最终选择依据:
- 高吞吐场景:增加Queue数量,使用并发消费
- 顺序性要求:使用分区顺序消息
- 全量同步需求:使用广播模式
- 复杂过滤:使用SQL过滤或Filter Server
RocketMQ的分发机制在性能、可靠性和灵活性之间取得了很好的平衡,使其成为企业级消息中间件的优选。