消息队列

消息队列

1.为什么使用消息队列

(1)解耦

传统模式:系统间耦合性太强.

如上图所示,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦!

中间件模式:将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

(2)异步

传统模式:一些非必要的业务逻辑以同步的方式运行,太耗费时间。

中间件模式:将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度。

(3)削峰

传统模式:并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常

中间件模式:系统A慢慢的按照数据库能处理的并发量,从消息队列中慢慢拉取消息。在生产中,这个短暂的高峰期积压是允许的。

2.使用消息队列的缺点

(1)系统可用性降低:

  你想啊,本来其他系统只要运行好好的,那你的系统就是正常的。现在你非要加个消息队列进去,那消息队列挂了,你的系统不是呵呵了。因此,系统可用性降低

(2)系统复杂性增加:

  要多考虑很多方面的问题,比如一致性问题、如何保证消息不被重复消费,如何保证保证消息可靠传输。因此,需要考虑的东西更多,系统复杂性增大。

3.如何保证消息队列是高可用的

  我们在使用消息队列的过程中,应该做到消息不能多消费,也不能少消费。

可靠性传输 :每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据

RabbitMQ消息队列:

###(1)生产者丢数据

  从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。

#####A:transaction机制

  发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。

缺点:就是吞吐量下降了。

#####B:confirm机制(生产上confirm模式的居多)

  channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这样生产者知道消息已经到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息,进行重试操作。

处理Ack和Nack的代码如下所示:

ConfirmListener() {
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

@Override

public void handleNack(long deliveryTag, boolean multiple) throws IOException {

System.out.println("nack: deliveryTag = "+deliveryTag+" multiple: "+multiple);

}

@Override

public void handleAck(long deliveryTag, boolean multiple) throws IOException {

System.out.println("ack: deliveryTag = "+deliveryTag+" multiple: "+multiple);

}

});

(2)消息队列丢数据

  处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

那么如何持久化呢?

  1、将queue的持久化标识durable设置为true,则代表是一个持久的队列

  2、发送消息的时候将deliveryMode=2
这样设置以后,rabbitMQ就算挂了,重启后也能恢复数据

(3)消费者丢数据

  消费者丢数据一般采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。

解决方案:采用手动确认消息即可。

4.消息确认模式

消费者从队列中获取消息,服务端如何知道消息已经被消费呢?

模式1:自动确认

只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。

模式2:手动确认

消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。

5.解决消息丢失

取消自动回复机制,改成手动回复机制

  RabbitMQ提供了消息确认机制:message acknowledgments,一个消费者处理完成后,将会回传一个ack给生产者,以表示处理成功,这样生产者才可以将消息删除.

  没有回传ack,那么发送者便会重发消息到队列,如果这时候有其他的消费者服务该队列,便会从队列中取出消息并处理。这就保证了消息的不丢失。

自动回复机制:不管是否处理成功,还是失败,都会回复ack。

  自动回复的理解,生产者发送后,消费者会默认自动回复ack(一般自动回复是没有问题的),收到之后就立即删除。 不再重发。

6.直连接和首部类型的比较

(1). 绑定规则不同

  直连接是一个简单的String;而首部是键值对Entry,而且键值对的value可以是任意类型Object

(2). 绑定个数不同

  直连接一次只能绑定一个字符串,如果想绑定多个字符串就需要绑定多次或者循环调用queueBind()方法来绑定多次;而首部类型直接可以往Map中添加多个实体Entry即可

(3).映射规则不同

  直连接只需要比较路由键是否相等即可,而首部类型除了比较value还要比较key,因为首部类型是Entry类型,需要同时比较key和value,而且首部类型还可以通过x-match来控制匹配的条件

all:需要匹配所有Entry,相当于SQL中的 and操作,

any:只需要匹配上一个Entry即可,相当于SQL中的or操作

(4).直连接适用于计较简单的路由,而首部类型相比直连接匹配规则更强大

7.对于交换机模式自身理解

  当消费者第一次绑定交换机时,首先得启动消费者服务器,再启动生产者服务器。若没有启动,则生成者服务器就不会把消息发送到对应的消费者绑定的队列。
第一次绑定完成后,生产者服务和消费者服务器启动顺序无要求。(亲测有效)

8.如何保证消息不被重复消费?

分析:这个问题其实换一种问法就是,如何保证消息队列的幂等性?

(1)为什么会造成重复消费?

 其实无论是那种消息队列,造成重复消费原因其实都是类似的。

正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同,

例如RabbitMQ是发送一个ACK确认消息,

RocketMQ是返回一个CONSUME_SUCCESS成功标志,

kafka实际上有个offset的概念,每一个消息有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。

(2)那造成重复消费的原因?

网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道已经消费过该消息了,再次将该消息分发给其他的消费者。

(3)如何解决?

  (1)比如,你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

  (2)再比如,你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。

  (3)如果上面两种情况还不行,准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

9.RabbitMQ交换器

RabbitMQ服务器会根据路由键将消息从交换器路由到队列中,如何处理投递到多个队列的情况?这里不同类型的交换器起到了重要的作用。分别是fanout,direct,topic,Headers每一种类型实现了不同的路由算法。

(1) Fanout Exchange

  不处理路由键。简单将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。

Fanout交换机转发消息是最快的。 

(2) Direct Exchange

  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “test”,则只有被标记为“test”的消息才被转发,不会转发test.aaa,也不会转发dog.123,只会转发test。

(3) Topic Exchange

  将路由键和某模式进行匹配。队列需要绑定一个模式上。

符号“#”匹配一个或多个词,
符号“*”匹配不多不少一个词。

  因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。

(4) Headers Exchange

  首部交换机和扇形交换机都不需要路由键routingKey.

  交换机通过Headers头部来将消息映射到队列的,有点像HTTP的Headers,Hash结构中要求携带一个键“x-match”,这个键的Value可以是any或者all,这代表消息携带的Hash是需要全部匹配(all),还是仅匹配一个键(any)就可以了。

相比直连交换机,首部交换机的优势是匹配的规则不被限定为字符串(string)而是Object类型。

####(1)any:

  只要在发布消息时携带一对键值对headers满足队列定义的多个参数arguments的其中一个就能匹配上,注意这里是键值对的完全匹配,只匹配到键了,值却不一样是不行的;

####(2)all:

  在发布消息时携带的所有Entry必须和绑定在队列上的所有Entry完全匹配.

文章标题:消息队列

发布时间:2019-11-13, 09:58:16

最后更新:2019-11-13, 09:50:18