MetaQ(RocketMQ)学习

1. 消息中间件解决的问题

消息中间件的核心作用就是三点:解耦,异步和并行,其中解耦是目的,异步和并行是表现形式。生产者和消费者彼此没有直接依赖,同步化解成了异步,大大减少了等待时间。例如:手机验证码的发送,当点击发送验证码的时候,只需要Client发送一条消息:{topic:sendVerifyCode,to:155xxxxxxxx,..}到消息中间件,消息中间件返回Client ok;然后消息中间件会将这条消息转发给订阅了这一个topic的Consumer,由Consumer去执行发送验证码的任务。由此看出消息中间主要做了三件事:接收消息+存储消息+转发消息。

2. JMS 与 MetaQ

  1. 消息传递方式
    JMS:1 基于队列的点对点消费模型,2 基于发布/订阅的消费模型
    MetaQ: 只有发布/订阅的消费方式

  2. 消息类型
    JMS:TextMessage、MapMessage、BytesMessage,StreamMessage、ObjectMessage。
    MetaQ:Message。

  3. 消息持久性
    JMS:持久订阅,指示 JMS provider 持久保存消息,以保证消息不会因为 JMS provider 的失败而丢失。非持久订阅, 不要求 JMS provider 持久保存消息。
    MetaQ 的消息都是持久性的

  4. API
    JMS:定义了消息中间件的生产端 api 和消费端 api,这些 api 都是约定的接口。
    MetaQ:没有遵从JMS的定义,自己搞了一套。

3. MetaQ基本概念

生产者

Producer:负责产生消息并发送消息到MetaQ服务器

消费者

Consumer:负责从MetaQ拉取消息并完成消费

Topic

消息的主题,由用户定义。类似于知乎的话题,Producer发送消息的时候需要指定发送到某一个topic下面,Consumer从某一个topic下面消费消息。

分区

同一个topic下面又分为多个分区,这些分区散落在各个服务器上,消息是发送到某一个topic下面的某一个分区,Consumer 也是从Topic下面的某一给分区拉取消息

Message

消息,负载发送的消息的信息。在生产者,服务端和 消费者之间传输

Broker

MetaQ的服务端。

Offset

消息在 Broker 上的每个分区都是组织成一个文件列表,消费者拉取数据需要知道数据在文件中的偏移量,这个偏移量就是所谓 offset。Offset 是绝对偏移量,服务器会将 offset 转化为具体文件的相对偏移量

Tag

每次发送一条消息的时候,给消息加一个Tag,方便Consumer过滤消息。

生产者Group/消费者Group

消费者可以是多个消费者共同消费一个 topic 下的消息,每个消费者消费部分消息。这些消费者就组成一个分组,拥有同一个分组名称,通常也称为消费者集群

集群消费/广播消费

  1. 集群消费,一条消息只会被同一个group里一个消费者消费。 不同group之间相互不影响。
  2. 广播消费,一条消息会被同一个group里每一个消费端消费。

4. 消息写入,存储与读出

MetaQ的存储结构是一种物理队列+逻辑队列的结构。如下图所示:
metaq
Producer生产消息,根据消息的topic选择topic对应某一个分区,然后发送到这个分区对应的Broker;Consumer根据订阅的topic选择去topic的某一个分区拉取消息。

4.1 生产者如何选择发送分区(生产者负载均衡)

每个broker都可以配置多个topic,每个topic可以有多少个分区。topic,broker,分区三者是多对多的关系。但是在生产者看来,一个topic在所有broker上的所有分区组成一个分区列表来使用。

在创建producer的时候,客户端会从zookeeper上获取publish的topic对应的broker和分区列表,生产者在发送消息的时候必须选择一台broker上的一个分区来发送消息。

生产者在通过zk获取分区列表之后,会按照brokerId和分区的顺序排列组织成一个有序的分区列表,发送的时候按照从头到尾循环往复的方式选择一个分区来发送消息。考虑到我们的broker服务器软硬件配置基本一致,默认的轮询策略已然足够。

在broker因为重启或者故障等因素无法服务的时候,producer通过zookeeper会感知到这个变化,将失效的分区从列表中移除做到fail over。因为从故障到感知变化有一个延迟,可能在那一瞬间会有部分的消息发送失败(在这一瞬间如果发送的是顺序消息怎么办?)。

4.2 存储消息

MetaQ将消息存储在本地文件中,每个文件最大大小为1G,如果写入新的消息时,超过当前文件大小,则会自动新建一个文件。文件名称为起始字节大小。以起始字节大小命名并排序这些文件是有诸多好处的,当消费者要抓取某个起始偏移量开始位置的数据,会变的很简单,只要根据传上来的offset二分查找文件列表,定位到具体文件,然后将绝对offset减去文件的起始节点转化为相对offset,即可开始传输数据。假设,每个文件大小为1KB,图中Consumer1 订阅了TopicA,采用pull的方式来拉取消息,刚好Consumer1又被匹配到了TopicA_2分区,Consumer1需要获取{offset=1200,size=200}处的消息。需要经历如下的步骤:

  1. 当改pull请求发送到Broker1的时候,Broker1遍历TopicA_2分区(分区就是一些按照文件起始字节大小命名的索引文件,每一个索引文件又包含了多个索引项)找到offset对应的索引项{offset=1200,size=100B,tagHashcode=xxx}。

  2. 然后Broker1根据offset值二分查找TopicA_2的commitlog,获取到offset=1200的消息所在的真实文件(0000001024.meta)

  3. 根据真实文件的文件名000001024 获取offset=1300的消息所在文件的起始位置=276(1300-1024)

  4. 接下来,Broker1从TopicA_2分区的commitlog文件组中0000001024.meta文件的276B个字节开始,读取100B,然后返回给Consumer1。

对于最终用户展现的消息队列只存储Offset,这样使得队列轻量化,单个队列数据量非常少。

这样做的好处如下
(1). 队列轻量化,单个队列数据量非常少。

(2). 对磁盘的访问串行化,避免磁盘竟争,不会因为队列增加导致IOWAIT增高。

每个方案都有缺点,它的缺点如下:

(1). 写虽然完全是顺序写,但是读却变成了完全的随机读。

(2). 读一条消息,会先读Consume Queue,再读Commit Log,增加了开销。

(3). 要保证Commit Log与Consume Queue完全的一致,增加了编程的复杂度。

以上缺点如何克服
(1). 随机读,尽可能让读命中PAGECACHE,减少IO读操作,所以内存越大越好。如果系统中堆积的消息过多,读数据要访问磁盘会不会由于随机读导致系统性能急剧下降,答案是否定的。

  1. 访问PAGECACHE时,即使只访问1k的消息,系统也会提前预读出更多数据,在下次读时,就可能命中内存。

  2. 随机访问Commit Log磁盘数据,系统IO调度算法设置为NOOP(不是ANTICIPATORY吗)方式,会在一定程度上将完全的随机读变成顺序跳跃方式

(2). 由于Consume Queue存储数据量极少,而且是顺序读,在PAGECACHE预读作用下,Consume Queue的读性能几乎与内存一致,即使堆积情况下。所以可认为Consume Queue完全不会阻碍读性能。

(3). Commit Log中存储了所有的元信息,包含消息体,类似于Mysql、Oracle的redolog,所以只要有Commit Log在,Consume Queue即使数据丢失,仍然可以恢复出来。

在读取消息的时候,如何加快读取消息的速度?
传统的read调用会经历内核态–>用户态—>内核态—>网卡缓冲区这样一个复杂的过程。MetaQ使用了mmap的方式,将硬盘文件映射到用内存中,也就是将page cache中的页直接映射到用户进程地址空间中,从而进程可以直接访问自身地址空间的虚拟地址来访问page cache中的页,这样会并不会涉及page cache到用户缓冲区之间的拷贝。对于小文件比较管用

4.3 消费者如何选择拉取的分区(消费者负载均衡)

消费者的负载均衡跟topic的分区数目紧密相关,要考察几个场景。

首先,单个分组内的消费者数目如果比总的分区数目多的话,则多出来的消费者不参与消费。

其次,如果分组内的消费者数目比分区数目小,则有部分消费者要额外承担消息的消费任务。

Meta的客户端会自动帮处理消费者的负载均衡,它会将消费者列表和分区列表分别排序,然后按照上述规则做合理的挂载。合理地设置分区数目至关重要。如果分区数目太小,则有部分消费者可能闲置,如果分区数目太大,则对服务器的性能有影响。
在某个消费者故障或者重启等情况下,其他消费者会感知到这一变化(通过 zookeeper watch消费者列表),然后重新进行负载均衡,保证所有的分区都有消费者进行消费。

5. 如何保证消息不丢

5.1 生产者可靠性保证

消息生产者发送消息后返回SendResult,如果isSuccess返回为true,则表示消息已经确认发送到服务器并被服务器接收存储。整个发送过程是一个同步的过程。保证消息送达服务器并返回结果。

5.2 服务器可靠性保证

消息生产者发送的消息,meta服务器收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成功之后返回应答给生产者。因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层os,os对写有缓冲。Meta有以下刷盘策略:

异步刷盘

  1. 每1000条消息(可配置),即强制调用一次force来写入磁盘设备。
  2. 每隔10秒(可配置),强制调用一次force来写入磁盘设备。

同步刷盘
如果存储配置上的groupCommitEnable选项为true,则会在写入消息后,立即强制刷盘。

5.3 消费者可靠性保证

消费者是一条接着一条地消费消息,只有在成功消费一条消息后才会接着消费下一条。如果在消费某条消息失败(如异常),则会尝试重试消费这条消 息(默认最大5次),超过最大次数后仍然无法消费,则将消息存储在消费者的本地磁盘,由后台线程继续做重试。而主线程继续往后走,消费后续的消息。因此, 只有在MessageListener确认成功消费一条消息后,meta的消费者才会继续消费另一条消息。由此来保证消息的可靠消费。消费者的另一个可靠性的关键点是offset的存储,也就是拉取数据的偏移量。默认存储在zoopkeeper上,zookeeper通过集群来保证数据的安全性。Offset会定期保存,并且在每次重新负载均衡前都会强制保存一次,因此可能会存在极端情况下的消息的重复消费。

6. 消息过滤(服务端/客户端)

消息过滤主要使用Message 的Tag字段做的。

  1. 在服务端,每一条消息对应的Tag被转换成一个8byte的hashcode, 在Broker 端对比Queue中每一个存储单元的的hashcode和 订阅的Tag的hashcode进行对比,不符合,则跳过,继续比对下一个,符合则传输给Consumer。在队列中进行hashcode对比
  2. Consumer 收到过滤后的消息后,再次将传递过来的Message中的Tag字符串和订阅的Tag字符串进行对比,不是hashcode。这样做可以避免Hash冲突

7. 消息重复性

MetaQ不能保证消息不重复,原因如下:

  • 发送消息阶段,会存在分布式环境下典型的超时问题,即发送阶段不能保证消息不重复。
  • 订阅消息阶段,由于涉及集群订阅,多个订阅方需要使用负载均衡方式订阅,在因负载均衡出现的短暂不一致的情况下可能会重复。
  • 订阅者意外宕机,消费进度未及时存储也会产生息重复。

如何解决?

  • Consumer收到消息后,通过Tair,DB去重。
  • 使用Pull的方式拉取消息,但是Pull的时候,怎么协调分配队列需要应用控制。

8. 消息顺序性

顺序消息:消费消息的顺序要同发送消息的顺序一致,在 MetaQ 中,主要指的是局部顺序,即一类消息为满足顺 序性,必须 Producer 单线程顺序发送,且发送到同一个队列,这样 Consumer 就可以按照 Producer 发送 的顺序去消费消息。

普通顺序消息:顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker 重启,由于队列 总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。 如果业务能容忍在集群异常情况(如某个 Broker 宕机或者重启)下,消息短暂的乱序,使用普通顺序方 式比较合适。

严格顺序消息:顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式 Failover 特性,即 Broker 集群中只 要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。 如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。

9. Pull/Push模型对比

Pull: Comsumer主动请求Broker获取消息,请求的时候需要指定消息的offset(第一次读取为0,根据每次读取的返回值可以获取到下一次需要读取的offset)。这样只要事先知道offset,只要消息还没有被清除就能读取消息出来在(即使已经读取过了)

Push:Consumer事先注册监听,由Broker主动推送消息到Consumer。Push能加载出未读取的消息的原因是MetaQ的服务端维护了一个offset。

坚持原创技术分享,您的支持将鼓励我继续创作!

热评文章

Fork me on GitHub