MQ消息队列

消息队列3大目标

异步

在生产者-消费者速度不匹配的情况下,使用异步可以减少等待,提高效率。

解耦

多个生产者可以通过消息队列管道集合成1条链路;也可以将1个生产者的消息负载均衡给多个消费者(只发送1条消息给MQ,MQ广播多份)。例如,增加了一个数据分析业务,这时候不需要修改业务代码,只需要配置MQ发送相应消息到大数据系统Server即可。
同时,生产者只需要关心将消息发送给MQ,无需关心后续处理(消费者挂了怎么办);MQ会负责和消费者通信。

削峰(生产者-消费者速度不同步)

由于队列本身是一条管道,拥有一定容量,因此可以削峰填谷,解决一些瞬时高并发流量。

消息队列的关键问题

C 系统一致性

A系统通过MQ将消息发送给B、C完成后续业务,B成功而C失败,这时如何保证一致性?

A 系统可用性

MQ宕机,依赖MQ管道的服务就不可用。MQ应该有高可用性和稳定性,不应该成为系统薄弱环节。
因此需要MQ集群,这时候又需要新的中间层NameSrv来管理维护MQ集群。

系统复杂度

  • 如何保证消费不丢失?
  • 如何避免重复消费?
  • 如何保证消息顺序?

幂等性

多次消费结果相当于只消费一次。

可以用业务id作为消息key,对key校验有没有消费过。
如果重复消费,确保多次消费和1次消费的结果相同。

  • 发送消息重复:发送后,网络断开,没收到ACK,导致重复发送
  • 消费消息重复:Consumer收到消息并处理完成,但是由于网络问题,Consumer应答没有发送到Broker;Broker遵从至少消费一次原则,重新发送。
  • Rebalance消息重复:Consumer Group的Consumer数量发生变化,触发Rebalance,此时Consumer可能会收到曾经被消费过的消息。

解决方案

Message Queue产品

产品 优势 劣势 场景
Kafaka 吞吐量大、性能高、集群高可用 丢数据、功能单一 MapReduce大数据采集、日志分析
RabbitMQ 消息可靠、功能全面 erlang语言不容易定制,吞吐量较低 小规模服务调用
Pulsar Bookeeper,消息可靠性高 使用较少、生态有差距 大规模服务调用
RocketMQ 高吞吐、高性能、高可用。Java语言容易定制。 Java服务加载慢 功能全面,尤其适合金融、电商、互联网场景

消息队列工作方式

RocketMQ和Kafka都使用Topic,每个Topic的内容会分发到多个管道(Partition或MessageQueue)。而Kafka在Topic过多的情况下,吞吐量会严重下降;RocketMQ解决了这个问题。

RocketMQ集群

在RocketMQ集群中,多台NameSrv是平等的,而Broker会组成多个主-从结构。
Slave只负责备份,只有Master(brokerId=0)才会发送消息。
然而主从结构的Slave,由于brokerId不为0,不会自动切换为Master,需要人工介入。

Dledger高可用集群

Dleger是一种Raft算法,实现了Leader选举。
Dledger会从Followers中自动选举Leader,从而保证高可用。

三种发送方式

单向发送

Producer只发送消息、不处理ACK;MQ也不发送ACK。消息可靠性没有保障。

// 返回值为null,不处理ACK。
public void sendOneWay(Message msg) throws ...Exception {
msg.setTopic(withNamespace(msg.getTopic()));
this.defaultMQProducerImpl.sendOneWay(msg);
}

同步发送

Producer等待MQ ACK,才继续操作。同步发送可能会发生阻塞。

public SendResult sendResult(
Collection<Message> msgs) throws ...Exception {
return this.defaultMQProducerImpl.send(batch(msgs));
}

异步发送

Producer不等待MQ ACK(异步ACK,也能保证不丢失消息),直接发送消息。
但是异步发送也有代价,我们不能发送完立刻producer.shutdown(),而需要设置一段延迟,使producer能够捕捉Exception并重发消息。

// send方法本身没有返回值,不会阻塞;但是能够处理Exception
public void send(Message msg,
SendCallBack sendCallBack) throws ...Exception {
msg.setTopic(withNamespace(msg.getTopic()));
try {
if (this.getAutoBatch() && !(msg instanceof MessageBatch)) {
sendByAccumulator(msg, null, sendCallBack);
} else {
sendDirect(msg, null, sendCallBack);
}
} catch (Throwable e) {
sendCallBack.onException(e);
}
}

producer.send(msg, new SendCallBack() {
@Override
public void onSuccess(SendResult sendResult) {
...
}

@Override
public void onException(Throwable e) {
...
}
});

两种消费方式

Consumer拉取

Consumer维护一个轮询拉取,Broker收到拉取请求后发送消息。

Broker推送

一般只用推模式,因为Consumer需要轮询(即使Broker不一定有消息),会消耗部分资源。

消息类型

顺序消息

局部有序,实际上是序号相同的消息发送到同一个队列管道,然后消费者从一个管道中拿消息,从而保证有序性。

广播消息

正常情况下,多个Consumer是负载均衡模式,一条消息只会发到其中一个Consumer消费;而在广播模式下,所有的Consumer都会收到消息。
在代码层面,正常情况下服务端统一维护消费者位点;而在广播模式下客户端本地.rocket_offsets维护消费者位点

消息重试

顺序消息

顺序消息要拿到ACK才会发送下一条消息,否则会重发消息

无序消息

为了保障无需消息的消费,MQ设置了一个消息重试间隔时间。如果没有回复,间隔10s-30s-1m-2m…来重发消息,最多重试16次(默认)。
如果达到重试上限还未消费,该消息称为死信消息。死信消息会进入死信队列

死信队列

死信队列不归属于Topic、Consumer,而是归属于Group Id。
死信队列的消息不会被再次重复消费,有效期为3天,过期删除。
可以手工在监控平台里处理死信,获取messageId后自己处理。

重复消费

网络闪断(成功执行,MQ没收到ACK)、生产者宕机(成功发送到MQ,生产者没收到ACK)会引发重复消费。