Rocket MQ

当前公司使用的消息中间件为Rocket MQ,但是说实话,我对Rocket MQ的认知也仅限于调用他们封装好的接口(比如说Sendmessage这种方法名,太常见了吧),我也看了他们封装了什么东西在里面,讲实话,我没看出他们封装的有多高级,跟网上的差不多吧一些策略之类的东西,我这种写业务的从来不会去深究,只管调用好就好,这大概是初级程序员经常干的事。但是人还是要上进的呀,写一篇笔记,这也算是系统的学习一下Rocket MQ了。

Rocket MQ

由什么构成?

NameServer

首先从命名上面看,肯定是做命名相关的活,服务治理之类的,有点像zookeeper,没错,其实Rocket MQ最开始就是使用zookeeper的,后来才自己做了实现。

  • 作用:

    • 维护Broker的地址列表,维护Broker的存活状态(Broker启动的时候会在NameServer进行注册)。
    • 维护Topic和Topic对应队列的地址列表,Broker每次发送心跳过来时都会将Topic的信息带过来。
    • 总结:维护Broker和Topic等信息,通过netty来进行长连接来保持Producer和Consumer跟Broker的通信,与此同时还提供心跳检测、数据更新查询等常规服务。
  • 主要实现类:

    • NamesrvStartUp:NameServer启动类
    • NamesrcController:NameServer控制类,主要负责NameServer的生命周期(启动、初始化、停止)
    • RouteInfoManager:存放Topic队列信息,Broker地址列表等一系列数据结构,并提供对应的数据变更接口。
    • DefaultRequestProcessosr:处理Broker发过来的所有消息,封装了对netty包的处理和部分对NameServer存储的数据查询和删除。
  • NameServer启动流程

img

Producer

消息生产者,主要是业务产生消息。

  • Producer由用户进行分布式部署,Producer会将消息通过负载均衡(轮询队列Roundbin,每个队列接收平均额消息量)发送到Broker集群,发送低延时,支持快速失败。
  • 支持发送延迟消息(可以指定时刻指定延迟间隔),指定为消息类型为延迟消息后,该消息发送到broker,需要等待延迟时间过后,才能背Consumer消费。
  • 可设置MessageQueueSelector来决定消息放到哪个Queue里。

Broker

消息中转角色,负责存储消息,转发消息。

  • Broker是具体提供业务的服务器,单个Broker节点与所有的NameServer节点保持长连接及心跳,并会定时将Topic信息注册到NameServer,底层的通信、连接都是基于netty所实现的。
  • Broker负责消息存储,以Topic为维度支持轻量级的队列,单机可以支撑上万队列规模(除了写demo,估计没人会使用单机模式)支持消息推拉模型

Consumer

消息消费者,负责消费消息,后台业务异步消费。

  • Push方式,通过长轮询实现,由Consumer维护Treemap,保存所有接收到的消息的列表,并实现类似于时间窗口的算法来做流控;PushConsumer会判断获取但还未处理消息个数消息总大小Offset的跨度,任何一个值超过设定的大小就隔一段时间再拉取消息,从而达到流量控制的目的。此外ProcessQueue还可以辅助实现顺序消费的逻辑。push方式推出Consume时,必须显示调用shutdown()告知broker记录offset等信息。
  • Pull方式,需要自己维护从Broker的队列上读取消息的offset,并存储到本地,从而保障当Consumer挂掉重启之后能狗重新接着源地址消费消息。
  • 从原理上讲,pull方式应该是不支持broadcast消息方式的,除非Broker通过session获取连接到其中的所有Consumer,并为未消费的consumer保留消息等待消费。
  • Consumer只负责消费消息,当消费消息失败的时候,会将消息写回Broker,并设置其Topic为SCHEDULE_TOPIC_XXXX(同样会做落盘处理)。

关键特性

尽管Rocket MQ生来就是为了分布式系统所设计的消息中间件,消息的顺序问题,消息的重复消费问题。

消息有序

Rocket MQ是如何保证消息的顺序性呢?

在介绍生产的者的时候我这边就介绍了一下,Rocket MQ会通过轮询所有队列的方式来决定消息将被发送到哪个队列(负载均衡策略)。Rocket MQ会通过MessageQueueSelector()所实现的算法来选择同一个队列,默认提供了两种实现:随机/Hash ,若你不想使用他提供的算法,你可以根据实际的业务来决定。

业务如何实现消费有序

若要实现严格的顺序消费(即全面考虑,网络延迟、数据丢失等方面),可以实现的方式便是应答 ACK+顺序在前先发送,也就是:

保证*生产者—MQServer—消费者 *是一对一的关系

这样虽然保证了消息的顺序消费,与此同时,也会出现其他的问题:吞吐量不够更多的异常处理(一但消费端出现问题,将会阻塞整个流程)

消息重复

Rocket MQ不保证消息不重复,若你的业务需要严格的保证不重复消息,需要在业务端去重。

消费端处理消息的业务逻辑保持幂等性。

保证每条消息都有唯一编号且保证消息处理成功,其次去重表的日志显的尤为重要。

  • 本文作者: KitAndrew Lee
  • 本文链接: 2019/09/03/Rocket MQ/
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 3.0 CN 许可协议。转载请注明出处!