Zacard's Notes

rocketmq源码阅读之message发送

背景

基于rocketmq 4.2.0

先不关注顺序消息和事务消息,后面独立看。

发送消息入口

DefaultMQProducer#send(Message)

默认的是同步发送。最终调用的是DefaultMQProducerImpl#sendDefaultImpl,直接看代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 确保producer已经启动
this.makeSureStateOK();
// 校验message,例如topic不能为空
Validators.checkMessage(msg, this.defaultMQProducer);
// 目前来看,仅在日志输出时标示此次调用
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 获取topic发布路由信息 -- 会先尝试缓存中获取,其次从namesrv中获取
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
Exception exception = null;
SendResult sendResult = null;
// 同步发送默认重试3次
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
String lastBrokerName = null == mq ? null : mq.getBrokerName();
/*
* 选择一个消息队列
* 规则:1)默认情况,不开启发送的延迟容错策略时,RoundRobin形式的选择一个不属于上一次发送的broker队列
* 2)开启延迟容错策略时:
* 2.1)优先从上一次的发送的broker中RoundRobin形式选择一个可用队列
* 2.2)其次按照可用性排名(是否可用>延迟时间>开始时间)从前半数中RoundRobin选
* 2.3)最次,啥都不管,RoundRobin选
*/
MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
if (mqSelected != null) {
mq = mqSelected;
brokersSent[times] = mq.getBrokerName();
try {
beginTimestampPrev = System.currentTimeMillis();
// 核心消息投递方法
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout);
endTimestamp = System.currentTimeMillis();
// 更新延迟容错信息
this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
case ASYNC:
return null;
case ONEWAY:
return null;
case SYNC:
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
continue;
}
}
return sendResult;
default:
break;
}
} catch (RemotingException e) {
// ...省略部分代码
}
} else {
break;
}
}
// ...省略部分代码
}
// ...省略部分代码
}

思考

  1. final long invokeID = random.nextLong();

    这个invokeID仅在日志输出时标示此次调用,却可能造成性能问题。共享的random虽然是线程安全的,但是每次调用都需要循环cas操作来替换每次的随机种子,高并发下,可能造成线程饥饿

    这里建议JDK7及以上使用ThreadLocalRandom代替。(不过RocketMQ可能是因为要兼容jdk6才没有用,不过我仍然觉得即使jdk6没有现成的类,也应该自己设计一个类似的类,追求性能的路上没有终点~)

如何选择一个消息队列

最终由MQFaultStrategy#selectOneMessageQueue处理,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
if (this.sendLatencyFaultEnable) {
try {
// 优先从上一次的发送的broker中RoundRobin形式选择一个可用队列
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName))
return mq;
}
}
// 其次按照可用性排名(是否可用>延迟时间>开始时间)从前半数中RoundRobin选
final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
if (writeQueueNums > 0) {
final MessageQueue mq = tpInfo.selectOneMessageQueue();
if (notBestBroker != null) {
mq.setBrokerName(notBestBroker);
mq.setQueueId(tpInfo.getSendWhichQueue().getAndIncrement() % writeQueueNums);
}
return mq;
} else {
latencyFaultTolerance.remove(notBestBroker);
}
} catch (Exception e) {
log.error("Error occurred when selecting message queue", e);
}
// 最次,啥都不管,直接RoundRobin选
return tpInfo.selectOneMessageQueue();
}
// 默认情况,不开启发送的延迟容错策略时,RoundRobin形式的选择一个不属于上一次发送的broker队列
return tpInfo.selectOneMessageQueue(lastBrokerName);
}

总结选择规则

  1. 默认情况,不开启发送的延迟容错策略时,RoundRobin形式的选择一个不属于上一次发送的broker队列
  2. 开启延迟容错策略时:
    1. 优先从上一次的发送的broker中RoundRobin形式选择一个可用队列
    2. 其次按照可用性排名(是否可用>延迟时间>开始时间)从前半数中RoundRobin选
    3. 最次,啥都不管,直接RoundRobin选

思考

个人感觉这块代码不够整洁。先看以下代码片段:

1
2
3
4
5
6
int index = tpInfo.getSendWhichQueue().getAndIncrement();
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
if (pos < 0)
pos = 0;
// 省略其他代码

同时看下tpInfo.getSendWhichQueue().getAndIncrement()具体的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public int getAndIncrement() {
Integer index = this.threadLocalIndex.get();
if (null == index) {
index = Math.abs(random.nextInt());
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
}
index = Math.abs(index + 1);
if (index < 0)
index = 0;
this.threadLocalIndex.set(index);
return index;
}

这里有2个问题:

  • Math.abs(index++)这操作绝大多数情况下没有必要的,因为getAndIncrement()保证了其为正数。只有当在循环体内,且index取值为[Integer.MAX_VALUE-tpInfo.getMessageQueueList().size(),Integer.MAX_VALUE]才可能需要,同时后面的if (pos < 0)也是如此。
  • pos没有回设到Threadlocal中,导致会有那么几次选择的是同一个消息队列,不过这个倒是影响不大。RocketMQ可能是为了性能考虑。

个人觉得这段直接使用getAndIncrement会更简洁明确一点:

1
2
3
4
5
6
7
8
9
10
for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
int pos = tpInfo.getSendWhichQueue().getAndIncrement() % tpInfo.getMessageQueueList().size();
MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
if (latencyFaultTolerance.isAvailable(mq.getBrokerName())) {
if (null == lastBrokerName || mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
}
// 剩余代码省略

可以看到,精简之后,出现了对同一个对象连续调用了其实例方法,感觉有点混乱。其实这里本质就是从上一个发送成功的broker选择队列,为何要独立到MQFaultStrategy中呢?可以仍然由TopicPublishInfo直接出一个方法selectMessageQueueWithBroker

1
2
3
4
5
6
7
8
9
10
11
public MessageQueue selectOneMessageQueueWithBroker(String lastBrokerName) {
int size = messageQueueList.size();
for (int i = 0; i < size; i++) {
int pos = sendWhichQueue.getAndIncrement() % size;
MessageQueue mq = messageQueueList.get(pos);
if (lastBrokerName == null || mq.getBrokerName().equals(lastBrokerName)) {
return mq;
}
}
return null;
}

进一步思考,选择一个消息队列的时候,由一个延迟容错的策略类(MQFaultStrategy)代理,然后基本所有的选择逻辑又都在topic路由类(TopicPublishInfo)中。总感觉有点奇怪。

个人想法: MQFaultStrategy应该设计为由MessageQueueSelectorSendMessageHook组合实现,这样就能和DefaultMQProducerImpl#send(Message msg, MessageQueueSelector selector, Object arg)统一,流程一致。

目前是MQFaultStrategy强耦合到了默认的消息发送流程中,一方面这个策略类难以被替换,另一方面,和其他重载的的消息发送方法流程不太一致

核心消息发送方法

调用的是DefaultMQProducerImpl#sendKernelImpl,直接看源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
private SendResult sendKernelImpl(final Message msg,
final MessageQueue mq,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final TopicPublishInfo topicPublishInfo,
final long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
// 获取broker地址
String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
if (null == brokerAddr) {
tryToFindTopicPublishInfo(mq.getTopic());
brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());
}
SendMessageContext context = null;
if (brokerAddr != null) {
// 是否使用broker vip通道,broker会开启两个端口对外服务
brokerAddr = MixAll.brokerVIPChannel(this.defaultMQProducer.isSendMessageWithVIPChannel(), brokerAddr);
byte[] prevBody = msg.getBody();
try {
// 设置消息唯一id
//for MessageBatch,ID has been set in the generating process
if (!(msg instanceof MessageBatch)) {
// 唯一id生成逻辑:pid+ip+ClassLoader.hashCode+(当前时间-开始时间)+递增数
MessageClientIDSetter.setUniqID(msg);
}
// 消息压缩
int sysFlag = 0;
if (this.tryToCompressMessage(msg)) {
sysFlag |= MessageSysFlag.COMPRESSED_FLAG;
}
// 事务消息
final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
sysFlag |= MessageSysFlag.TRANSACTION_PREPARED_TYPE;
}
// 钩子:检查是否允许发送消息的扩展点
if (hasCheckForbiddenHook()) {
CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
checkForbiddenContext.setCommunicationMode(communicationMode);
checkForbiddenContext.setBrokerAddr(brokerAddr);
checkForbiddenContext.setMessage(msg);
checkForbiddenContext.setMq(mq);
checkForbiddenContext.setUnitMode(this.isUnitMode());
this.executeCheckForbiddenHook(checkForbiddenContext);
}
// 钩子:发送消息前的扩展点
if (this.hasSendMessageHook()) {
context = new SendMessageContext();
context.setProducer(this);
context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
context.setCommunicationMode(communicationMode);
context.setBornHost(this.defaultMQProducer.getClientIP());
context.setBrokerAddr(brokerAddr);
context.setMessage(msg);
context.setMq(mq);
String isTrans = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (isTrans != null && isTrans.equals("true")) {
context.setMsgType(MessageType.Trans_Msg_Half);
}
if (msg.getProperty("__STARTDELIVERTIME") != null || msg.getProperty(MessageConst.PROPERTY_DELAY_TIME_LEVEL) != null) {
context.setMsgType(MessageType.Delay_Msg);
}
this.executeSendMessageHookBefore(context);
}
// 构建发送消息请求
SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
requestHeader.setTopic(msg.getTopic());
requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
requestHeader.setQueueId(mq.getQueueId());
requestHeader.setSysFlag(sysFlag);
requestHeader.setBornTimestamp(System.currentTimeMillis());
requestHeader.setFlag(msg.getFlag());
requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
requestHeader.setReconsumeTimes(0);
requestHeader.setUnitMode(this.isUnitMode());
requestHeader.setBatch(msg instanceof MessageBatch);
if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
if (reconsumeTimes != null) {
requestHeader.setReconsumeTimes(Integer.valueOf(reconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
}
String maxReconsumeTimes = MessageAccessor.getMaxReconsumeTimes(msg);
if (maxReconsumeTimes != null) {
requestHeader.setMaxReconsumeTimes(Integer.valueOf(maxReconsumeTimes));
MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_MAX_RECONSUME_TIMES);
}
}
// 发送消息,这里真正要调用网络层发送消息
SendResult sendResult = null;
switch (communicationMode) {
case ASYNC:
// 整整12个入参...
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
sendCallback,
topicPublishInfo,
this.mQClientFactory,
this.defaultMQProducer.getRetryTimesWhenSendAsyncFailed(),
context,
this);
break;
case ONEWAY:
case SYNC:
sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(
brokerAddr,
mq.getBrokerName(),
msg,
requestHeader,
timeout,
communicationMode,
context,
this);
break;
default:
assert false;
break;
}
// 钩子:消息发送后的扩展点
if (this.hasSendMessageHook()) {
context.setSendResult(sendResult);
this.executeSendMessageHookAfter(context);
}
return sendResult;
} catch (RemotingException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (MQBrokerException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} catch (InterruptedException e) {
if (this.hasSendMessageHook()) {
context.setException(e);
this.executeSendMessageHookAfter(context);
}
throw e;
} finally {
msg.setBody(prevBody);
}
}
// broker地址为空抛出异常
throw new MQClientException("The broker[" + mq.getBrokerName() + "] not exist", null);
}

思考

这里感觉最大的问题就是api和spi没有分离好。

  1. 扩展点(hook)不一致:有CheckForbiddenHookSendMessageHook这2个hook,目前看CheckForbiddenHook完全可以由SendMessageHook#sendMessageBefore实现

    可以定义一个发送消息的流程类SendMessageProcesser,定义消息发送的流程。然后定义一个一致的hook类,比如ProducerHook。然后出个注解定义hook类型(before或者after),在流程类中对ProducerHook组装

  2. 扩展点(hook)设置略显简陋:hook设置是通过DefaultMQProducerImpl#registerXXXHook方法add到一个ArrayList中的,这样一方面sdk使用者这法明确添加hook的执行顺序,一方面不能精细设置某个hook必须要在某个hook之前或之后调用

    可以出个注解定义顺序,然后使用类似TreeSet来排序

说明

里面的思考都是基于个人初步阅读源码这个前提的看法,欢迎各位大神斧正!

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章