RocketMQ源码解读——同一消费组下不同消费者订阅关系不同时
@(rocketmq源码解读)
先解释一下题目,我们假设有一个Producer
和两个Consumer
,Producer
向TOPICA
和TOPICB
发送消息,两个Consumer
分别订阅两个topic
。我们看下这时候会出现的问题,以及根据源码分析一下为什么出现问题。
现象
现象其实还是比较隐蔽的,broker
上会打印:the consumer's subscription not exist,group ...
的日志(Consumer
端也会打印类似的日志)。
还会有一些subscription changed, group: ...
类似的日志,并且如果仔细的话还会发现,其中一个消费者消费消息时,另外一个就不会消费。
源码分析
我们看一下为什么会导致这样的问题,一开始生看或者debug都是很难下手,这时候可能就需要使用必杀技(一般不外传那种)——问。
问天问地,谷歌百度必应。我直接问了一个大神——芋艿。大神说这种情况会出问题,具体原因他也记不清了,导致这种现象的问题应该是消费关系不停地相互覆盖。
好了,听到这句话我们就有入口了,至少知道应该从Broker
上找起。
顺藤摸瓜找到了原因,下面一起看一下源码。
首先我们知道,消费者的两种实现(推和拉)中都维护一个MQClientInstance
,这个类非常重要,在启动消费者的时候,都会去启动这个类,我们看下启动的代码,其中有这么一部分:
// Start various schedule tasksthis.startScheduledTask();复制代码
这里启动了好多定时任务,我们追进去看一下:
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { MQClientInstance.this.cleanOfflineBroker(); //定时发送心跳 MQClientInstance.this.sendHeartbeatToAllBrokerWithLock(); } catch (Exception e) { log.error("ScheduledTask sendHeartbeatToAllBroker exception", e); } }}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);复制代码
这里我们看到,消费者会定时发送心跳给Broker
,我们继续追进去,最后找到sendHeartbeatToAllBroker
方法:
//给所有的broker发送心跳if (!this.brokerAddrTable.isEmpty()) { long times = this.sendHeartbeatTimesTotal.getAndIncrement(); Iterator>> it = this.brokerAddrTable.entrySet().iterator(); while (it.hasNext()) { Entry > entry = it.next(); String brokerName = entry.getKey(); HashMap oneTable = entry.getValue(); if (oneTable != null) { for (Map.Entry entry1 : oneTable.entrySet()) { Long id = entry1.getKey(); String addr = entry1.getValue(); if (addr != null) { if (consumerEmpty) { if (id != MixAll.MASTER_ID) continue; } try { //真正发送心跳的部分 int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000); if (!this.brokerVersionTable.containsKey(brokerName)) { this.brokerVersionTable.put(brokerName, new HashMap (4)); } this.brokerVersionTable.get(brokerName).put(addr, version); if (times % 20 == 0) { log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr); log.info(heartbeatData.toString()); } } catch (Exception e) { if (this.isBrokerInNameServer(addr)) { log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr); } else { log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName, id, addr); } } } } } }}复制代码
这里会向所有的Broker
发送心跳,我们根据我们的例子,这时候Broker
是一台,我们再去Broker
上看一下Broker
如何处理心跳消息,我们根据发送的是HEART_BEAT
类型的消息,可以在Broker
上看到,这类消息使用ClientManageProcessor
处理,我们看下处理心跳的部分(heartBeat
方法):
//循环所有发送过来的数据for (ConsumerData data : heartbeatData.getConsumerDataSet()) { //根据消费组的名字获取broker上记录的消费消息 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName()); boolean isNotifyConsumerIdsChangedEnable = true; if (null != subscriptionGroupConfig) { isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable(); int topicSysFlag = 0; if (data.isUnitMode()) { topicSysFlag = TopicSysFlag.buildSysFlag(false, true); } String newTopic = MixAll.getRetryTopic(data.getGroupName()); this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( newTopic, subscriptionGroupConfig.getRetryQueueNums(), PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag); } //注册消费者 boolean changed = this.brokerController.getConsumerManager().registerConsumer( data.getGroupName(), clientChannelInfo, data.getConsumeType(), data.getMessageModel(), data.getConsumeFromWhere(), data.getSubscriptionDataSet(), isNotifyConsumerIdsChangedEnable ); if (changed) { log.info("registerConsumer info changed {} {}", data.toString(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()) ); }}复制代码
我们可以看到,broker
会根据consumer
放过来的消息,获取自己这边记录的消费者订阅的信息,注意,获取时是按照消费组获取的,我们看下registerConsumer
:
//根据消费组获取消费者信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (null == consumerGroupInfo) { ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere); //注意这里,这里consumerTable的键就是group ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp); consumerGroupInfo = prev != null ? prev : tmp;}boolean r1 = consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel, consumeFromWhere);boolean r2 = consumerGroupInfo.updateSubscription(subList);if (r1 || r2) { if (isNotifyConsumerIdsChangedEnable) { this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel()); }}this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);return r1 || r2;复制代码
我们注意ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
这里,这句话告诉我们consumerTable
中存放的消费者信息是按照消费组来的,那么一个组的消费信息如果不一样,按照我们的例子中,则订阅了TOPICA
的消费者心跳信息告诉Broker
:我们组订阅的是TOPICA
!然后Broker
就记录下来了。过了一会订阅了TOPICB
的消费者心跳信息高速Broker
:我们订阅的是TOPICB
!
这里就导致了订阅消息相互覆盖,那么拉取消息时,肯定有一个消费者没法拉到消息,因为Broker
上查询不到订阅信息。
至此我们就知道了导致上述现象的原因。