0%

rabbitmq笔记

》如果没有交换机,那么routekey就必须是队列名,否则通道不知道要把消息传到哪个队列中去

》RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。

假如信息a已经被发送给了消费者1,那么一定会等到信息a的ack返回后,才会把信息b发送给消费者2(即下一个消费者),不管消费者1空闲与否或者其他消费者空闲与否,这里应该叫“轮流,顺上来”的概念
即默认情况下,当一个信息被发送给一个消费者,那么下一个信息一定会被发送给下一个消费者,而不是其他消费者。
当然,只有1个消费者时,由于它自己就是下一个消费者,那就一直是它获得消息
怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。

  1. 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。

  2. 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。

为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用非公平分发,必须关闭自动应答,改为手动应答。

1
2
3
4
5
6
7
8
9
10
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);

//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

同时改为手动确认:
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);
这样的话,消费快的消费者能得到更多的消息

》如果交换机设置为fanout类型,那么routekey是不会起作用的,交换机一定会把它收到的所有消息都发给所有与它绑定的队列,只有当类型为topic或direct时才会起作用

》消息无法由exchange路由到合适的队列的处理方法(这种情况一定是direct或topic,fanout不存在这种问题)

》1.设置mandatory参数为true

上一篇文章中我们知道,生产者将消息发送到RabbitMQ的交换器中通过RoutingKey与BindingKey的匹配将之路由到具体的队列中以供消费者消费。那么当我们通过匹配规则找不到队列的时候,消息将何去何从呢?Rabbit给我们提供了两种方式。mandatory与备份交换器。
mandatory参数是channel.BasicPublish方法中的参数。其主要功能是消息传递过程中不可达目的地时将消息返回给生产者。当mandatory 参数设为true 时,交换器无法根据自身的类型和路由键找到一个符合条件的队列,那么RabbitMQ 会调用BasicReturn 命令将消息返回给生产者。当mandatory 参数设置为false 时。则消息直接被丢弃。其运转流程与实现代码如下(以C# RabbitMQ.Client 3.6.9为例):
注意,这和死信队列没关系,死信是已经在队列中的内容过期后的处理措施,这个mandatory是交换器把这个消息路由不到任何消息队列的处理方法。
1
2
3
4
5
6
7
8
9
10
String message = "hello world";
channel.basicPublish("eee", "", true, properties, message.getBytes());
System.out.println("[x] Sent '" + message + "'");

channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("replyCode="+replyCode + " replyText="+replyText+" exchange="+exchange + " routeKey="+routingKey+" body="+new String(body)); //#1
}
});
注意,由于是异步的,如果主线程先结束了,#1处的代码可能还没执行就整体结束了。

》设置备份exchange

当消息不能路由到队列时,通过mandatory设置参数,我们可以将消息返回给生产者处理。但这样会有一个问题,就是生产者需要开一个回调的函数来处理不能路由到的消息,这无疑会增加生产者的处理逻辑。备份交换器(Altemate Exchange)则提供了另一种方式来处理不能路由的消息。备份交换器可以将未被路由的消息存储在RabbitMQ中,在需要的时候去处理这些消息。其主要实现代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
Map<String, Object> arguments = new HashMap<String, Object>(16);
arguments.put("alternate-exchange", "backup");
//普通交换器和普通队列
channel.exchangeDeclare("normal", "direct", true, false, arguments);
channel.queueDeclare("q1", false, false, false, null);
channel.queueBind("q1", "normal", "r1");

//备份交换器和备份队列
channel.exchangeDeclare("backup", "fanout", true, false, null);
channel.queueDeclare("qb", false, false, false, null);
channel.queueBind("qb", "backup", "");

channel.basicPublish("normal", "", null, "abcdef".getBytes());

备份交换器其实和普通的交换器没有太大的区别,为了方便使用,建议设置为fanout类型,若设置为direct 或者topic的类型。需要注意的是,消息被重新发送到备份交换器时的路由键和从生产者发出的路由键是一样的。考虑这样一种情况,如果备份交换器的类型是direct,并且有一个与其绑定的队列,假设绑定的路由键是key1,当某条携带路由键为key2 的消息被转发到这个备份交换器的时候,备份交换器没有匹配到合适的队列,则消息丢失。如果消息携带的路由键为key1,则可以存储到队列中。
对于备份交换器,有以下几种特殊情况:

  • 如果设置的备份交换器不存在,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有绑定任何队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器没有任何匹配的队列,客户端和RabbitMQ 服务端都不会有异常出现,此时消息会丢失。
  • 如果备份交换器和mandatory参数一起使用,那么mandatory参数无效。

》设置消息的TTL:

目前有两种方法可以设置消息的TTL。第一种方法是通过队列属性设置,队列中所有消息都有相同的过期时间。第二种方法是对消息本身进行单独设置,每条消息的TTL可以不同。如果两种方法一起使用,则消息的TTL 以两者之间较小的那个数值为准。消息在队列中的生存时间一旦超过设置的TTL值时,就会变成"死信" (Dead Message) ,消费者将无法再收到该消息。(有关死信队列请往下看)
1
2
3
4
5
6
7
8
9
10
11
12
方法一:给队列参数上加上x-message-ttl属性,这样所有进入该队列的消息都会有统一的过期时间
Map<String, Object> arguments = new HashMap<String, Object>(16);
arguments.put("x-message-ttl", 10000);
arguments.put("x-max-priority", 10);
channel.exchangeDeclare("eee", "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
方法二:给消息上加上属性:
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.expiration("20000") //表示20000ms
.build();
String message = "hello world";
channel.basicPublish("eee", "", properties, message.getBytes());

注意:对于第一种设置队列TTL属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费者之前判定的。Why?在第一种方法里,队列中己过期的消息肯定在队列头部, RabbitMQ 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将被消费时再判定是否过期,如果过期再进行删除即可。

》设置队列的TTL

注意,这里和上述通过队列设置消息的TTL不同。上面删除的是消息,而这里删除的是队列。通过channel.queueDeclare 方法中的x-expires参数可以控制队列被自动删除前处于未使用状态的时间。这个未使用的意思是队列上没有任何的消费者,队列也没有被重新声明,并且在过期时间段内也未调用过channel.basicGet命令。
设置队列里的TTL可以应用于类似RPC方式的回复队列,在RPC中,许多队列会被创建出来,但是却是未被使用的(有关RabbitMQ实现RPC请往下看)。RabbitMQ会确保在过期时间到达后将队列删除,但是不保障删除的动作有多及时。在RabbitMQ 重启后, 持久化的队列的过期时间会被重新计算。用于表示过期时间的x-expires参数以毫秒为单位, 井且服从和x-message-ttl一样的约束条件,不同的是它不能设置为0(会报错)。
1
2
3
4
Map<String, Object> arguments = new HashMap<String, Object>(16);
arguments.put("x-expires", 2000);
channel.exchangeDeclare("eee", "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);

》死信队列

DLX(Dead-Letter-Exchange)死信交换器,当消息在一个队列中变成死信之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX ,绑定DLX的队列就称之为死信队列。

消息变成死信主要有以下几种情况:

  • 消息被拒绝(BasicReject/BasicNack) ,井且设置requeue 参数为false;
  • 消息过期;
  • 队列达到最大长度。
    DLX也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ 就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理。
    通过在channel.queueDeclare 方法中设置x-dead-letter-exchange参数来为这个队列添加DLX。其示例代码如下:

》对于队列参数的设置,map的内容必须是在声明队列之前就有了,不能设置了arguments后再往map中放东西

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Map<String, Object> arguments = new HashMap<String, Object>(16);
//死信队列配置 ----------------
String dlxExchangeName = "dlx.exchange";
String dlxQueueName = "dlx.queue";
String dlxRoutingKey = "#";

// 为队列设置队列交换器
arguments.put("x-dead-letter-exchange", dlxExchangeName); //只能设置死信交换器,不能直接把死信队列与其他队列绑定
// 设置队列中的消息 3s 钟后过期
arguments.put("x-message-ttl", 3000);
channel.exchangeDeclare("eee", "fanout", true, false, null);
channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);
//给arguments中设置内容不能放在这句后面不然参数不起作用
channel.queueBind(QUEUE_NAME, "eee", "”);

// 创建死信交换器和队列
channel.exchangeDeclare(dlxExchangeName, "topic", true, false, null);
channel.queueDeclare(dlxQueueName, true, false, false, null);
channel.queueBind(dlxQueueName, dlxExchangeName, "#");

》必须有一个交换器与之绑定。死信队列一定要持久化,但设置死信队列的队列不一定要持久化**

死信队列和死信交换器的声明和普通队列,交换器没什么区别(除了一定要持久化),表明它是死信交换器的位置是把它的名字设置为一个其他队列的“x-dead-letter-exchange” 参数

》可以用过期队列+死信队列 来模拟延迟队列**

生产者将消息发送到过期时间为n的队列中,这个队列并未有消费者来消费消息,当过期时间到达时,消息会通过死信交换器被转发到死信队列中。而消费者从死信队列中消费消息。这个时候就达到了生产者发布了消息在讲过了n时间后消费者消费了消息,起到了延迟消费的作用。
 延迟队列在我们的项目中可以应用于很多场景,如:下单后两个消息取消订单,七天自动收货,七天自动好评,密码冻结后24小时解冻,以及在分布式系统中消息补偿机制(1s后补偿,10s后补偿,5m后补偿......)。

》优先级队列:

就像我们生活中的“特殊”人士一样,我们的业务上也存在一些“特殊”消息,可能需要优先进行处理,在生活上我们可能会对这部分特殊人士开辟一套VIP通道,而Rabbit同样也有这样的VIP通道(前提是在3.5的版本以后),即优先级队列,队列中的消息会有优先级。优先级高的消息具备优先被消费的特权。针对这些VIP消息,我们只需做两件事:
我们只需做两件事情:
1.将队列声明为优先级队列,即在创建队列的时候添加参数 x-max-priority 以指定最大的优先级,值为0-255(整数)。
2.为优先级消息添加优先级。
其示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
channel.exchangeDeclare("exchange.priority", "direct", true);//定义交换器
Map<String, Object> arguments = new HashMap<String, Object>(16);
// 为队列设置队列交换器
arguments.put("x-dead-letter-exchange", dlxExchangeName);
// 设置队列中的消息 ms 钟后过期
arguments.put("x-message-ttl", 10000);
arguments.put("x-max-priority", 10);args.Add("x-max-priority", 10);//定义优先级队列的最大优先级为10
channel.queueDeclare("queue.priority", true, false, false, args);//定义优先级队列
channel.queueBind("queue.priority", "exchange.priority", "priorityKey");//队列交换器绑定
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.priority(8)
.build();
var message = “testMsg8";
//发布消息
channel.BasicPublish("exchange.priority", "priorityKey", properties, message);

注意:没有指定优先级的消息会将优先级以0对待。 对于超过优先级队列所定最大优先级的消息,优先级以最大优先级对待。对于相同优先级的消息,后进的排在前面。如果在消费者的消费速度大于生产者的速度且Broker 中没有消息堆积的情况下, 对发送的消息设置优先级也就没有什么实际意义。因为生产者刚发送完一条消息就被消费者消费了,那么就相当于Broker 中至多只有一条消息,对于单条消息来说优先级是没有什么意义的。
关于优先级队列,好像违背了队列这种数据结构先进先出的原则,其具体是怎么实现的在这里就不过多讨论。

何时创建队列
从前面的文章我们知道,RabbitMQ可以选择在生产者创建队列,也可以在消费者端创建队列,也可以提前创建好队列,而生产者消费者直接使用即可。
RabbitMQ的消息存储在队列中,交换器的使用并不真正耗费服务器的性能,而队列会。如在实际业务应用中,需要对所创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值,以便在固定硬件资源的情况下能够进行合理有效的分配。
按照RabbitMQ官方建议,生产者和消费者都应该尝试创建(这里指声明操作)队列。这虽然是一个很好的建议,但是在我看来这个时间上没有最好的方案,只有最适合的方案。我们往往需要结合业务、资源等方面在各种方案里面选择一个最适合我们的方案。
如果业务本身在架构设计之初己经充分地预估了队列的使用情况,完全可以在业务程序上线之前在服务器上创建好(比如通过页面管理、RabbitMQ命令或者更好的是从配置中心下发),这样业务程序也可以免去声明的过程,直接使用即可。预先创建好资源还有一个好处是,可以确保交换器和队列之间正确地绑定匹配。很多时候,由于人为因素、代码缺陷等,发送消息的交换器并没有绑定任何队列,那么消息将会丢失:或者交换器绑定了某个队列,但是发送消息时的路由键无法与现存的队列匹配,那么消息也会丢失。当然可以配合mandatory参数或者备份交换器(关于mandatory参数的使用详细可参考我的上一篇文章) 来提高程序的健壮性。与此同时,预估好队列的使用情况非常重要,如果在后期运行过程中超过预定的阈值,可以根据实际情况对当前集群进行扩容或者将相应的队列迁移到其他集群。迁移的过程也可以对业务程序完全透明。此种方法也更有利于开发和运维分工,便于相应资源的管理。如果集群资源充足,而即将使用的队列所占用的资源又在可控的范围之内,为了增加业务程序的灵活性,也完全可以在业务程序中声明队列。至于是使用预先分配创建资源的静态方式还是动态的创建方式,需要从业务逻辑本身、公司运维体系和公司硬件资源等方面考虑。
也就是说,如果预先创建好了队列或者交换器,就不用再queueDeclare或exchangeDeclare了,直接用即可。

》持久化策略

持久化及策略#
作为一个内存中间件,在保证了速度的情况下,不可避免存在如内存数据库同样的问题,即丢失问题。持久化可以提高RabbitMQ 的可靠性,以防在异常情况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

  1. 交换器的持久化
    交换器的持久化是通过在声明队列是将durable 参数置为true 实现的(该参数默认为false)。如果交换器不设置持久化,那么在RabbitMQ 服务重启之后,相关的交换器元数据会丢失,不过消息不会丢失,只是不能将消息发送到这个交换器中了。对一个长期使用的交换器来说,建议将其置为持久化的。
  2. 队列的持久化
    队列的持久化是通过在声明队列时将durable 参数置为true 实现的(该参数默认为false),如果队列不设置持久化,那么在RabbitMQ 服务重启之后,相关队列的元数据会丢失,此时数据也会丢失。正所谓”皮之不存,毛将焉附”,队列都没有了,消息又能存在哪里呢?
  3. 消息的持久化
    队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。要确保消息不会丢失,需要将其设置为持久化。通过将消息的投递模式(BasicProperties中的DeliveryMode属性)设置为2即可实现消息的持久化。
    因此,消息如果要想在Rabbit重启、关闭、宕机时能够恢复,需要做到以下三点:
  • 把消息的投递模式设置为2
  • 发送到持久化的交换器
  • 到达持久化的队列
    注意:RabbitMQ 确保持久化消息能从服务器重启中恢复的方式是将它们写入磁盘上的一个持久化日志文件中。当发布一条持久化消息到持久化交换器时,Rabbit会在日志提交到日志文件后才发送响应(开启生产者确认机制)。之后,如果消息到了非持久化队列,它会自动从日志文件中删除,并且无法在服务器重启后恢复。因此单单只设置队列持久化,重启之后消息会丢失;单单只设置消息的持久化,重启之后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化是毫无意义的。当从持久化队列中消费了消息后(并且确认后),RabbitMQ会在持久化日志中把这条消息标记为等待垃圾收集。而在消费持久化消息之前,若RabbitMQ服务器重启,会自动重建交换器、队列以及绑定,重播持久化日志文件中的消息到合适的队列或者交换器上(取决于宕机时,消息处在路由的哪个环节)。
    为了保障消息不会丢失,也许我们可以简单粗暴的将所有的消息标记为持久化,但这样我们会付出性能的代价。写入磁盘的速度比写入内存的速度慢得不只一点点。对于可靠性不是那么高的消息可以不采用持久化处理以提高整体的吞吐量。在选择是否要将消息持久化时,需要在可靠性和吐吞量之间做一个权衡。
    将交换器、队列、消息都设置了持久化之后就能百分之百保证数据不丢失了吗?
  • 从消费者来说,如果在订阅消费队列时将noAck参数设置为true ,那么当消费者接收到相关消息之后,还没来得及处理就宕机了,这样也算数据丢失。
  • 在持久化的消息正确存入RabbitMQ 之后,还需要有一段时间(虽然很短,但是不可忽视〉才能存入磁盘之中。RabbitMQ 并不会为每条消息都进行同步存盘的处理,可能仅仅保存到操作系统缓存之中而不是物理磁盘之中。如果在这段时间内RabbitMQ 服务节点发生了岩机、重启等异常情况,消息保存还没来得及落盘,那么这些消息将会丢失。
    关于第一个问题,可以通过消费者确认机制来解决。而第二个问题可以通过生产者确认机制来解决,也可以使用镜像队列机制(镜像队列机制,将在运维篇总结)。生产者确认消费者确认请往下看。

》生产者确认机制

上文我们知道,在使用RabbitMQ的时候,可以通过消息持久化操作来解决因为服务器的异常崩溃而导致的消息丢失,除此之外,我们还会遇到一个问题,当消息的生产者将消息发送出去之后,消息到底有没有正确地到达服务器呢?如果不进行特殊配置,默认情况下发送消息的操作是不会返回任何信息给生产者的,也就是默认情况下生产者是不知道消息有没有正确地到达服务器。如果在消息到达服务器之前己经丢失,持久化操作也解决不了这个问题,因为消息根本没有到达服务器,何谈持久化?
RabbitMQ针对这个问题,提供了两种解决方式:
  • 通过事务机制实现:
  • 通过发送方确认(publisher confirm)机制实现。

1.RabbitMQ 事务机制#

RabbitMQ 客户端中与事务机制相关的方法有三个:channel.txSelect(用于将当前信道设置为事务模式);channel.txCommit(用于提交事务),channel.txRollback(用于回滚事务)。在通过channel.txSelect方法开启事务之后,我们便可以发布消息给RabbitMQ了,如果事务提交成功,则消息一定到达了RabbitMQ 中,如果在事务提交执行之前由于RabbitMQ异常崩溃或者其他原因抛出异常,这个时候我们便可以将其捕获,进而通过执行channel.txRollback方法来实现事务回滚。示例代码如下所示:(如果在事务提交后异常,则就算执行rollback也不能回滚了)
1
2
3
4
5
6
7
8
channel.txSelect();
try {
channel.basicPublish("eee", "r1", true, null, "dasd12*(!@*#!".getBytes());
channel.txCommit();
}catch (Exception e){
System.out.println("rollback");
channel.txRollback();
}
事务确实能够解决消息发送方和RabbitMQ之间消息确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功,否则便可在捕获异常之后进行事务回滚,与此同时可以进行消息重发。但是使用事务同样会带来一些问题。
  • 会阻塞,发布者必须等待broker处理每个消息。
  • 事务是重量级的,每次提交都需要fsync(),需要耗费大量的时间
  • 事务非常耗性能,会降低RabbitMQ的消息吞吐量。

2.发送方确认机制#

前面介绍了RabbitMQ可能会遇到的一个问题,即消息发送方(生产者〉并不知道消息是否真正地到达了RabbitMQ。随后了解到在AMQP协议层面提供了事务机制来解决这个问题,但是采用事务机制实现会严重降低RabbitMQ的消息吞吐量,这里就引入了一种轻量级的方式一发送方确认(publisher confirm)机制。生产者将信道设置成confirm确认)模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID( 从1开始),一旦消息被投递到所有匹配的队列之后(或者是没有合适的队列而被丢弃。总之是消息确确实实到了rabbitmq上),RabbitMQ就会发送一个确认(BasicAck) 给生产者(包含消息的唯一ID),这就使得生产者知晓消息已经正确到达了目的地了。(注意,一旦消息到了rabbitmq(即到了队列或者被丢弃)就返回确认,而不是等到被消费)如果消息和队列是可持久化的,那么确认消息会在消息写入磁盘之后发出。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
》同步方法:
channel.confirmSelect(); //一定要写明这句,表示将信道设置为确认模式,否则会报错:Confirms not selected
String message = "qazwsx";
channel.basicPublish("eee", "", false, null, message.getBytes());
boolean result = channel.waitForConfirms();
if(result){
System.out.println("arrived");
}else {
System.out.println("no way");
}
》异步方法:
channel.confirmSelect(); ////一定要写明这句,表示将信道设置为确认模式,否则confrimListener的ack/nack方法不会被调用
String message = "qazwsx";
channel.basicPublish("eee", "r1", false, null, message.getBytes());
channel.addConfirmListener(new ConfirmListener() {
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("arrived");
}
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("no way");
}
});
关于生产者确认机制同样会有一些问题,broker(也就是rabbitmq系统,exchange和queue等的统称)不能保证消息会被confirm,只知道将会进行confirm。这样如果broker与生产者之间的连接断开,导致生产者不能收到确认消息,可能会重复进行发布。总之,生产者确认模式给客户端提供了一种较为轻量级的方式,能够跟踪哪些消息被broker处理,哪些可能因为broker宕掉或者网络失败的情况而重新发布。
注意:事务机制和publisher confirm机制两者是互斥的,不能共存。如果企图将已开启事务模式的信道再设置为publisher confirm模式, RabbitMQ会报错,或者如果企图将已开启publisher confirm模式的信道设置为事务模式, RabbitMQ也会报错。在性能上来看,而到底应该选择事务机制还是Confirm机制,则需要结合我们的业务场景。

》消费者确认机制

由于生产者和消费者不直接通信,生产者只负责把消息发送到队列,消费者只负责从队列获取消息(不管是push还是pull)。
消息被”消费”后,是需要从队列中删除的.那怎么确认消息被”成功消费”了呢?
是消费者从队列获取到消息后,broker 就从队列中删除该消息?
那如果消费者收到消息后,还没来得及”消费”它,或者说还没来得及进行业务逻辑处理时,消费者所在的信道或者连接因某种原因断开了,那这条消息岂不是就被无情的抛弃了…
我们更期望的是,消费者从队列获取到消息后,broker 暂时不删除该条消息,等到消费者”成功消费”掉该消息后,再删除它。
所以需要一个机制来确认生产者发送的消息被消费者”成功消费”。
RabbitMQ 提供了一种叫做”消费者确认”的机制.
如上,消费者在消费消息的同时,Rabbit会同步给予消费者一个DeliveryTag,这个DeliveryTag就像我们数据库中的主键,消费者在消费完毕后拿着这个DeliveryTag去Rabbit确认或拒绝这个消息。
消费者确认
消费者确认分两种:自动确认和手动确认.
在自动确认模式中,消息在发送到消费者后即被认为”成功消费”.这种模式可以降低吞吐量(只要消费者可以跟上),以降低交付和消费者处理的安全性.这种模式通常被称为“即发即忘”.与手动确认模型不同,如果消费者的TCP连接或通道在真正的”成功消费”之前关闭,则服务器发送的消息将丢失.因此,自动消息确认应被视为不安全,并不适用于所有工作负载. channel.basicConsume(QUEUE_NAME, true, consumer); //第二个参数是autoAck,为true则证明开启自动确认。
使用自动确认模式时需要考虑的另一件事是消费者过载.手动确认模式通常与有界信道预取(BasicQos方法)一起使用,该预取限制了信道上未完成(“进行中”)的消息的数量.但是,自动确认没有这种限制.因此,消费者可能会被消息的发送速度所淹没,可能会导致消息积压并耗尽堆或使操作系统终止其进程.某些客户端库将应用TCP反压(停止从套接字读取,直到未处理的交付积压超过某个限制).因此,仅建议能够以稳定的速度有效处理消息的消费者使用自动确认模式。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
QueueingConsumer consumer = new QueueingConsumer(channel);

//自动确认:
channel.basicConsume(QUEUE_NAME, true, consumer);
channel.basicQos(1);
//手动确认:拉模式:只有当broker收到消费确认后,才从队列中删除这一条消息。
channel.basicConsume(QUEUE_NAME, false, consumer);
channel.basicQos(1);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //发出消费者确认
basicAck(deliveryTag, multiple):
第一个参数deliveryTag
就是接受的消息的deliveryTag,可以通过delivery.getEnvelope().getDeliveryTag()获得
第二个参数 multiple
如果为true,确认之前接受到的消息;如果为false,只确认当前消息。
如果为true就表示连续取得多条消息才发会确认,和计算机网络的中tcp协议接受分组的累积确认十分相似,能够提高效率。

void basicAck(long deliveryTag, boolean multiple);
void basicReject(long deliveryTag, boolean requeue);
void basicNack(long deliveryTag, boolean multiple, boolean requeue);
* deliveryTag:可以看作消息的编号,它是一个64位的长整型值,最大值是9223372036854775807
* requeue:如果requeue 参数设置为true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue 参数设置为false,则RabbitMQ立即会把消息从队列中移除,而不会把它发送给新的消费者。
* BasicReject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令。
* multiple:在BasicAck中,multiple 参数设置为true 则表示确认deliveryTag编号之前所有已被当前消费者确认的消息。在BasicNack中,multiple 参数设置为true 则表示拒绝deliveryTag 编号之前所有未被当前消费者确认的消息。
说明:将channel.BasicReject 或者channel.BasicNack中的requeue设置为false ,可以启用"死信队列"的功能。
上述requeue,都会将消息重新存入队列发送给下一个消费者(也有可能是其它消费者)。关于requeue还有下面一种用法。可以选择是否补发给当前的consumer。
//补发消息 true退回到queue中 /false只补发给当前的consumer
channel.basicRecover(true);
注意:RabbitMQ仅仅通过Consumer的连接中断来确认该Message并没有被正确处理。也就是说,RabbitMQ给了Consumer足够长的时间来做数据处理。如果忘记了ack,那么后果很严重。当Consumer退出时,Message会重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,这个“内存泄漏”是致命的。

》消息传输保障

消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障分为三个层级。

  • At most once: 最多一次。消息可能会丢失,但绝不会重复传输。
  • At least once: 最少一次。消息绝不会丢失,但可能会重复传输。
  • Exactly once: 恰好一次。每条消息肯定会被传输一次且仅传输一次。
    RabbitMQ 支持其中的”最多一次”和”最少一次”。其中”最少一次”投递实现需要考虑以下这个几个方面的内容:
  1. 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到RabbitMQ 中。
  2. 消息生产者需要配合使用mandatory参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
  3. 消息和队列都需要进行持久化处理,以确保RabbitMQ服务器在遇到异常情况时不会造成消息丢失。
  4. 消费者在消费消息的同时需要将autoAck设置为false,然后通过手动确认的方式去确认己经正确消费的消息,以避免在消费端引起不必要的消息丢失。
    “最多一次”的方式就无须考虑以上那些方面,生产者随意发送,消费者随意消费,不过这样很难确保消息不会重复消费。
    “恰好一次”是RabbitMQ目前无法保障的(目前我也不知道哪个中间件能够保证)。消费者在消费完一条消息之后向RabbitMQ 发送确认BasicAck命令,此时由于网络断开或者其他原因造成RabbitMQ并没有收到这个确认命令,那么RabbitMQ不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。再考虑一种情况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ 返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ中就有两条同样的消息,在消费的时候,消费者就会重复消费。而解决重复消费可以通过消费者幂等等方式来解决。