背景
基于rocketmq 4.2.0
先不关注顺序消息和事务消息,后面独立看。
发送消息入口
DefaultMQProducer#send(Message)
默认的是同步发送。最终调用的是DefaultMQProducerImpl#sendDefaultImpl
,直接看代码:
|
|
思考
final long invokeID = random.nextLong();
这个invokeID仅在日志输出时标示此次调用,却可能造成性能问题。共享的random虽然是线程安全的,但是每次调用都需要循环cas操作来替换每次的随机种子,高并发下,可能造成线程饥饿
这里建议JDK7及以上使用
ThreadLocalRandom
代替。(不过RocketMQ可能是因为要兼容jdk6才没有用,不过我仍然觉得即使jdk6没有现成的类,也应该自己设计一个类似的类,追求性能的路上没有终点~)
如何选择一个消息队列
最终由MQFaultStrategy#selectOneMessageQueue
处理,源码如下:
|
|
总结选择规则
- 默认情况,不开启发送的延迟容错策略时,RoundRobin形式的选择一个不属于上一次发送的broker队列
- 开启延迟容错策略时:
- 优先从上一次的发送的broker中RoundRobin形式选择一个可用队列
- 其次按照可用性排名(是否可用>延迟时间>开始时间)从前半数中RoundRobin选
- 最次,啥都不管,直接RoundRobin选
思考
个人感觉这块代码不够整洁。先看以下代码片段:
|
|
同时看下tpInfo.getSendWhichQueue().getAndIncrement()具体的代码:
|
|
这里有2个问题:
Math.abs(index++)
这操作绝大多数情况下没有必要的,因为getAndIncrement()
保证了其为正数。只有当在循环体内,且index取值为[Integer.MAX_VALUE-tpInfo.getMessageQueueList().size(),Integer.MAX_VALUE]
才可能需要,同时后面的if (pos < 0)
也是如此。- pos没有回设到Threadlocal中,导致会有那么几次选择的是同一个消息队列,不过这个倒是影响不大。RocketMQ可能是为了性能考虑。
个人觉得这段直接使用getAndIncrement会更简洁明确一点:
|
|
可以看到,精简之后,出现了对同一个对象连续调用了其实例方法,感觉有点混乱。其实这里本质就是从上一个发送成功的broker选择队列,为何要独立到MQFaultStrategy
中呢?可以仍然由TopicPublishInfo直接出一个方法selectMessageQueueWithBroker
:
|
|
进一步思考,选择一个消息队列的时候,由一个延迟容错的策略类(MQFaultStrategy
)代理,然后基本所有的选择逻辑又都在topic路由类(TopicPublishInfo
)中。总感觉有点奇怪。
个人想法:
MQFaultStrategy
应该设计为由MessageQueueSelector
和SendMessageHook
组合实现,这样就能和DefaultMQProducerImpl#send(Message msg, MessageQueueSelector selector, Object arg)
统一,流程一致。目前是
MQFaultStrategy
强耦合到了默认的消息发送流程中,一方面这个策略类难以被替换,另一方面,和其他重载的的消息发送方法流程不太一致
核心消息发送方法
调用的是DefaultMQProducerImpl#sendKernelImpl
,直接看源码:
|
|
思考
这里感觉最大的问题就是api和spi没有分离好。
扩展点(hook)不一致:有
CheckForbiddenHook
和SendMessageHook
这2个hook,目前看CheckForbiddenHook
完全可以由SendMessageHook#sendMessageBefore
实现可以定义一个发送消息的流程类SendMessageProcesser,定义消息发送的流程。然后定义一个一致的hook类,比如ProducerHook。然后出个注解定义hook类型(before或者after),在流程类中对ProducerHook组装
扩展点(hook)设置略显简陋:hook设置是通过
DefaultMQProducerImpl#registerXXXHook
方法add到一个ArrayList中的,这样一方面sdk使用者这法明确添加hook的执行顺序,一方面不能精细设置某个hook必须要在某个hook之前或之后调用可以出个注解定义顺序,然后使用类似TreeSet来排序
说明
里面的思考都是基于个人初步阅读源码这个前提的看法,欢迎各位大神斧正!