RabbitMQ 消息队列 什么是MQ MQ(Messge queue),从字面意思来看,本质上是一个队列,FIFO先进先出原则,只不过队列中存放的内容是消息而已,还是一种可以跨进程的通信机制,用于上下游传递消息。在互联网的构架中,MQ是一种非常常见的上下游“逻辑解耦+物理解耦”的消息通信服务。使用了MQ后,消息发送上游只需要依赖MQ,不用依赖其他服务
为什么要用MQ呢 流量削峰 举个例子,如果订单系统最多能够处理一万次订单,这个处理能力应付正常时段的下单绰绰有余,正常时间段我们下单一秒就能返回结果,但是高峰期,如果有两万次下单操作,系统是处理不了的,只能限制订单超过一万后不允许用户下单,使用消息队列做缓冲,我们可以取消这个限制,把一秒内下单三成一段时间来处理,这时有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好
应用解耦 以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。当庄边成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复,在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,下单用户感受不到物流系统的故障,提升系统可用性
异步处理 有些服务间调用时异步的,例如A调用B,B需要花很长时间去执行,但是A需要知道B什么时候可以执行完,以前一般有两种方式,A过一段时间去调用B的API查询,或者A提供一个 callback API ,B执行完之后调用API通知A服务。这两种犯事都不是很优雅,使用消息总线可以很方便的解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完后,会发送一条消息给MQ,MQ会将此消息装发给A服务,这样A服务既不用循环调用B查询API,也不会提供callBack API。同样B服务也不用做这些操作,A服务还能及时得到异步处理成功的消息
RabbitMQ 概念 RabbitMQ 是一个消息中间件:他接受并转发消息,你可以把它作为一个快递站点,当你要发送一个包裹时,你把你的包裹放到快递站,快递员最终把你的快递送到收件人手上,按照这种逻辑RabbitMQ是一个快递站,一个快递员帮你传递邮件,RabbitMQ与快递站的区别在于,他不处理快件而是接收,存储和转发消息数据
四大核心概念 生产者:产生数据发送消息的程序是生产者
交换机:交换机是RabbitMQ非常重要的一个不见,一方面它接收来自生产者的消息,另一方便他将消息推送到队列中,交换机必须确切的知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推送到多个队列,亦或是把消息丢弃,这个得有交换机的类型来决定
队列:队列是RabbitMQ内部使用的一种数据结构,,尽管小溪流经RabbitMQ和应用程序,但它们只能存在消息队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区,许多生产者可以将消息发送到一个队列,许多消费者可以尝试从一个队列接收数据,这就是我们使用队列的方式
消息者:消费与接收具有相似的含义。消费大多数时候是一个等待接受消息的程序,请注意生产者消费者和中间件很多时候并不在同一个机器上。同一个应用程序既可以是生产者也可以是消费者
RabbitMQ几种工作模式 1、Work queues
2、Publish/Subscribe
3、Routing
4、Topics
5、Header
6、RPC
RabbitMQ中各个名词的介绍
Broker:接收和分发消息的应用,RabbitMQ Server就是Message Broker
Virtual Host:出于多租户和安全因素设计的,把AMQP的基本组件还分到一个虚拟分组中,类似于网络中namespace的概念,当多个用户使用同一个RabbitMQ Server提供服务时,可以划分出多个vhost,每个用户在自己的vhost创建 exchang/queque
Connection: publisher/consumer和Broker之间的TCP连接
Channel:如果每一次访问RabbitMQ都建立一个Connection,在消息量巨大的时候简历TCP Connection的开销将是巨大的,效率也低下,Channel是在Connection内部建立的逻辑连接,如果应用程序支持多线程,通常每个线程创建单独的channel进行通讯,AMQP method包含了channel id 帮助客户端和message broker 识别 channel,所以channel之间是完全隔离的。Channel作为轻量级的Connection极大减少了操作系统简历TCP Connection的开销
Exchange:message到达broker的第一站,根据分发规则,匹配查询表的routing key ,分发到queque中去,常见的类型有 direct(point-to-point) ,topic(publish-subscribe) and fanout(multicast)
Queue:消息最终被送到这里等待consumer取走
Binding:exchange和queue之间的虚拟连接,binding中可以包含routing key,Binding信息被保存到exchange中的查询表中,用于message的分发依据
下载安装 安装
由于RabbitMQ是由erlang所编写的所以安装RabbitMQ之前要安装erlang环境,安装Linux的Socat工具,然后再安装RabbitMQ
erlang版本:21.3-1.el7
还需要安装socat
rabbitMQ版本 3.8.8.1-el7
安装完成后即可开启服务
1 2 3 4 /sbin/service rabbitmq-server start //开启RabbitMQ服务 /sbin/service rabbitmq-server status //查看RabbitMQ服务的状态、 /sbin/service rabbitmq-server stop //关闭RabbitMQ服务 chkconfig rabbitmq-server on //开机启动RabbitMQ服务
而后安装RabbitMQ管理插件 1 rabbitmq-plugins enable rabbitmq_management
此时会出现报错
这时需要我们去修改环境文件
添加配置
左边为本机的IP地址 右边为本机的主机名
开放防火墙
1 2 3 4 5 6 7 firewall-cmd --zone=public --add-port=15672/tcp --permanent firewall-cmd --zone=public --add-port=5672/tcp --permanent firewall-cmd --reload
然后我们
前半段为我们Linux的虚拟机IP地址,后面为其端口号
RabbitMq为我们提供了默认的用户名和密码
Username:guest
Password:guest
RabbitMq Management的基本命令 然后我们需要创建用户
1 2 3 4 添加用户: rabbitmqctl add_user {username} {password} 删除用户:rabbitmqctl delete_user {username} 修改密码:rabbitmqctl change_password {username} {newpassword} 设置用户角色:rabbitmqctl set_user_tags {username} {tag}
tag参数表示用户角色取值为:management , monitoring ,****policymaker administrator
要想实现管理权权限也要设置权限
1 rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"
management
用户可以通过AMQP做的任何事外加:
列出自己可以通过AMQP登入的virtual hosts
查看自己的virtual hosts中的queues, exchanges 和 bindings
查看和关闭自己的channels 和 connections
查看有关自己的virtual hosts的“全局”的统计信息,包含其他用户在这些virtual hosts中的活动。
policymaker
management可以做的任何事外加:
查看、创建和删除自己的virtual hosts所属的policies和parameters
monitoring
management可以做的任何事外加:
列出所有virtual hosts,包括他们不能登录的virtual hosts
查看其他用户的connections和channels
查看节点级别的数据如clustering和memory使用情况
查看真正的关于所有virtual hosts的全局的统计信息
administrator
policymaker和monitoring可以做的任何事外加:
创建和删除virtual hosts
查看、创建和删除users
查看创建和删除permissions
关闭其他用户的connections
这就是我们所登录的页面
简单模式(simple)
消息生产者将消息放入队列
消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
首先先导入坐标 1 2 3 4 5 6 7 8 9 10 <dependency > <groupId > com.rabbitmq</groupId > <artifactId > amqp-client</artifactId > <version > 5.8.0</version > </dependency > <dependency > <groupId > commons-io</groupId > <artifactId > commons-io</artifactId > <version > 2.6</version > </dependency >
消息生产者的构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 package com.atguigu.rabbitmq.one;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;import java.util.Queue;import java.util.concurrent.TimeoutException;public class Producer { public static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory=new ConnectionFactory ();\ factory.setHost("192.168.26.132" ); factory.setUsername("admin" ); factory.setPassword("xxxxxx" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); String message="hello world" ; channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("消息发送完毕" ); } }
消息消费者构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 package com.atguigu.rabbitmq.one;import com.rabbitmq.client.*;import java.io.IOException;import java.util.concurrent.TimeoutException;public class consumer { public static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("192.168.26.132" ); factory.setUsername("admin" ); factory.setPassword("123456" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback=(comsumerTag,message) ->{ System.out.println(new String (message.getBody())); }; CancelCallback cancelCallback=consumerTag->{ System.out.println("消息消费被中断" ); }; channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
将二者开启后
工作队列(Work Queque) 工作队列(又称任务队列)的主要思想就是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行,我们把任务封装成消息将其发送到队列。在后台运行的工作进程将弹出任务并执行作业。当多个工作线程时,这些工作线程将一起处理这些任务
这些线程的处理消息的处理方式为轮训处理,按顺序去处理消息,防止一个消息被重复消费多次,工作线程之间的关系为竞争
抽取工具类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 package com.atguigu.rabbitmq.utils;import com.rabbitmq.client.Channel;import com.rabbitmq.client.Connection;import com.rabbitmq.client.ConnectionFactory;public class RabbitMqUtils { public static Channel gerChannel () throws Exception{ ConnectionFactory factory = new ConnectionFactory (); factory.setHost("虚拟机IP地址" ); factory.setUsername("admin" ); factory.setPassword("XXXXXX" ); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); return channel; } }
工作线程的构建 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 package com.atguigu.rabbitmq.two;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Worker01 { public static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); DeliverCallback deliverCallback=(consumerTag,message)->{ System.out.println("接收到的消息:" +new String (message.getBody())); }; CancelCallback cancelCallback=consumerTag->{ System.out.println(consumerTag+"消息被取消了" ); }; System.out.println("C2开始接收消息" ); channel.basicConsume(QUEUE_NAME,true ,deliverCallback,cancelCallback); } }
倘若我们要开启多个工作线程可以在
选择对应的服务
这样即可
工作队列的构建 根据控制台输出内容进行消息的发送
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.atguigu.rabbitmq.two;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class Task01 { public static final String QUEUE_NAME="hello" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.queueDeclare(QUEUE_NAME,false ,false ,false ,null ); Scanner scanner=new Scanner (System.in); while (scanner.hasNext()){ String message = scanner.next(); channel.basicPublish("" ,QUEUE_NAME,null ,message.getBytes()); System.out.println("发送消息完成" ); } } }
生产者开始发送消息到消息队列
工作线程从连接中的信道获取消息
C1
C2
我们可以看到生产者发送的消息顺序为 AA BB CC DD ,反观工作线程Worker1,Woker2
worker1率先收到消息AA,而后worker2收到消息BB,再后是woker1收到消息CC ,woker2收到消息DD 由此可见,每个工作队列的交替接收消息的方式为轮训
消息应答 概念 消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了一部分突然它挂掉了,会发生什么情况。RabbitMQ一旦向消费者传递了一条消息便立刻将该消息编辑为删除,在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的信息,以及后续发送该消费者的消息,因为他无法接收到
为了保证消息在发送过程中不丢失,rabbitMQ引入了消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息后,告诉RabbitMQ他已经处理了,RabbitMQ可以吧该消息删除了
自动应答 消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在就收到之前,消费者那边出现连接或者Channel关闭那么消息就会丢失,当然另一方便这种模式消费者那边可以传递过载消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息积压,最终内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用再消费者可以高效并以某种速率能够处理这些信息的情况下使用
消息应答的方法 1 2 3 4 5 Channel.basicAck(用于肯定确认) RabbitMQ已经知道该信息并且成功处理消息,可以丢弃了 Channel.basicNack(用于否定确认) Channel.basicReject(用于否定) 与 Channel.basicNack相比少了一个参数 不处理该消息了,直接拒绝,可以将其丢弃
Multiple的解释 手动应答的好处就是可以批量应答并且减少网络拥堵
multiple的true和false代表不同意思
true代表批量应答channel上未应答的消息
比如说channel上有传送tag的消息 5,6,7,8 当前tag是8,那么此时5-8的这些还未应答的消息都会被应答
false代表不会批量应答channel上未应答的消息,只会应答deliverTag的消息
消息重新入队 如果消费者由于某些原因失去连接(其通道已经关闭,连接已关闭或TCP连接丢失),导致未发送ACK确认,RabbitMQ将了解消息未完全处理,并将对其重新排队,如果此时其他消费者可以处理,他很快将其重新分发给另一个消费者,这样即使某个消费者偶尔死亡,也可以确保不丢失任何消息
消息应答的消息生产者 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 package com.atguigu.rabbitmq.three;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class task03 { public static final String TASK_QUEUE_NAME="ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.queueDeclare(TASK_QUEUE_NAME,false ,false ,false ,null ); Scanner scanner=new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("" ,TASK_QUEUE_NAME,null ,message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" +message); } } }
消息应答的消息消费者 消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 package com.atguigu.rabbitmq.three;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.atguigu.rabbitmq.utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Worker01 { public static final String ACK_QUEUE_NAME="ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); System.out.println("C1等待接收消息处理的时间较短" ); DeliverCallback deliverCallback=(consumerTag, delivery) ->{ String message = new String (delivery.getBody(), "UTF-8" ); System.out.println("接收到的消息是:" +message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false ); SleepUtils.sleep(1 ); }; CancelCallback cancelCallback=consumerTag -> { System.out.println(consumerTag+"消费者取消消费" ); }; Boolean autoAck=false ; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 package com.atguigu.rabbitmq.three;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.atguigu.rabbitmq.utils.SleepUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class Worker02 { public static final String ACK_QUEUE_NAME="ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); System.out.println("C2等待接收消息处理的时间较长" ); DeliverCallback deliverCallback=(consumerTag, delivery) ->{ String message = new String (delivery.getBody(), "UTF-8" ); System.out.println("接收到的消息是:" +message); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false ); SleepUtils.sleep(30 ); }; CancelCallback cancelCallback=consumerTag -> { System.out.println(consumerTag+"消费者取消消费" ); }; Boolean autoAck=false ; channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback); } }
生产者发出消息
消费者1
消费者2
我们可以看出,我们消息生产者,发送了14条消息,即便是消息消费者消费速度快慢也和所接收的信息量多少不成正相关,并不是消费速度越快,消费的就越多而是从严格按照轮训的方式 假设将消息1分给worker1那么消息2就会分给worker2,worker2在处理消息2的时候,worker可以处理消息3 然后处理消费5 ……
Rabbitmq持久化 概念 刚刚我们已经看了如何处理任务不对事的情况,但是如何保障当Rabbitmq服务停掉以后消息生产者发送过来得消息不丢失。默认情况下RabbitMq退出或者由于某种原因崩溃时,它忽视队列和消息,除非告知它不要这样做。确保消息不会丢失需要两件事,我们需要将对了和消息都标记为持久化
队列持久化 我们打开RabbitMq管理页面,打开Queque发现,在Features是空的并非由 D 我们接下来对该队列进行持久化处理
首先我们要先删除该队列(不删除的话会报错),点进ack_queue
开启持久化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.atguigu.rabbitmq.three;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;public class task03 { public static final String TASK_QUEUE_NAME="ack_queue" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); boolean durable= true ; channel.queueDeclare(TASK_QUEUE_NAME,durable,false ,false ,null ); Scanner scanner=new Scanner (System.in); while (scanner.hasNext()) { String message = scanner.next(); channel.basicPublish("" ,TASK_QUEUE_NAME,null ,message.getBytes("UTF-8" )); System.out.println("生产者发出消息:" +message); } } }
执行后我们再管理页面发现
消息持久化 要想让消息实现持久化需要在生产者中修改代码
将消息标记为持久化并不能完全保证不会丢失消息,尽管告诉他RabbitMq将消息保存到磁盘,但是这里依旧存在当消息刚准备存储在磁盘的时候,但是还没存储完成,消息还在缓存的一个间隔点,此时并没有真正的写入磁盘,持久性保证并不强,但是对于简单任务的队列而言绰绰有余,如果需要更持久化的策略,请见 后文的发布确认
不公平分发 最开始学习时,我们学习到RabbitMQ分发消息采用的是轮训分发,但是在某种策略并不是很好,比如说有两个消费者在处理任务,其中一个消费者1处理任务的速度非常快,而另一个消费者2处理速度却很慢,这时候我们采用的还是轮训分发的话,处理的快的消费者很长一部分事件处于空闲状态,但是处理慢的消费者一直在处理,这种分配方式在此分配方法下就很不合理,但是RabbitMQ并不知道这种情况依然很公平的进行分发
为了避免这种情况,我们可以设置参数Channel.basicQos(1)
开启后的消息生产者
开启后的消息消费者1
消息消费者2
我们可以发现消息消费者1接受处理了4个消息,而消息消费者2接收处理了1个消息,已经不是公平分发的轮训操作了而是不公平的分发
预取值 RabbitMQ的信道上肯定不止只有一个消息,因此这里就存在一个未确认的消息缓冲区,因此希望开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。这个时候就可以通过使用 channel.basicQos() 方法设置“预取计数”值来完成的。该值定义通道上允许的未确认消息的最大数量。。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认
RabbitMQ 发布确认 原理 生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列时可持久化的,那么确认消息会在将消息写入磁盘后发出,broker回传给生产者确认消息中的deliverry-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发下一条消息,当消息最终得到确认后,生产者应用便可以通过回调的方法来处理该确认消息,如果Rabbit MQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用升序同样可以在回调发发中处理该nack消息
1 2 3 channel.confirmSelect(); channel.waitForConfirms();
发布确认的策略 单个确认发布 这是一种简单的确认方式,它是一种同步确认的发布方式,也就是发布一个消息之后只有它被去人发布,后续的消息才能继续发布,waitForConfirmsOrDie(long)
批量确认发布 和单个确认没有很大差别,个人去定义一批为多少,来确认一次发布,然后确认即可
异步确认发布 异步确认发布时逻辑比较复杂的,并且效率很高,它利用回调函数来达到消息可靠性传递,这个中间件也是通过函数回调来保证是否投递成功
异步确认发布,就是,消息生产者只管往MQ里扔,发布成功没成功,通过broker的监听器异步返回 回调函数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 public static void publicMessageAsync () throws Exception{ Channel channel = RabbitMqUtils.gerChannel(); String QueueName = UUID.randomUUID().toString(); channel.confirmSelect(); boolean durable = true ; channel.queueDeclare(QueueName, durable, false , false , null ); ConfirmCallback ackCallback=(deliveryTag, multiple)->{ System.out.println("确认的消息:" +deliveryTag); }; ConfirmCallback nackCallback=(deliveryTag,multiple)->{ System.out.println("未确认的消息:" +deliveryTag); }; channel.addConfirmListener(ackCallback,nackCallback); long begin = System.currentTimeMillis(); for (int i = 0 ; i < MESSAGE_COUNT; i++) { String message="消息" +i; channel.basicPublish("" ,QueueName,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes()); } long end = System.currentTimeMillis(); System.out.println("共耗时:" + (end - begin) + "毫秒" ); } }
处理异步未确认消息 最好的解决方法就是把未确认的消息放到一个基于内存的能够发布线程访问的队列,比如用ConcurrentLinkedQueue队列,在confirm callback与发布线程之间进行消息传递
交换机 对于简单模式和工作模式而言他们的 运行流程时这样的,每个消息只能消费一次,并且交换机使用的都是默认的交换机,在本流程中并不体现
但是Broker中不仅有queue还有 exchange交换机,它应该是这样的
消息生产者生产消息到连接中 由连接发送到不同的交换机中,交换机绑定(routingkey)着各自的队列,消费者消费消息的时候需要去连接中去取信道,在信道中获得消息,着就是RabbitMQ的其中一个工作模式 Publish/Subscribe
概念 RabbitMQ消息传送模型的核心思想就是:生产者生产的消息从不会直接发送到队列当中,实际上通常生产者甚至都不知道这些消息发送到了哪些队列之中,
相反的生产者只能将信息发送到交换机中,交换机的工作也非常简单,一方面它来接收来自生产者的消息,另一方面,将他们推入队列。交换机必须确切的知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们放到许多队列中或者是丢弃他们,这就由交换机的类型来决定
Exchanges的类型 有以下类型:
直接(direct)、主题(topic)、标题(headers)、扇出(fanout,发布订阅) ,无名exchange
无名exchange我们在前面的方法中一直使用,当调用
1 channel.basicPublish("" ,QueueName,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
中,第一个空串(””)即为无名exchange
消息能够从路由发送到队列中其实是由routingKey(bindingKey)绑定key指定的,如果exchange为无名exchange的话那么routingKey即为所要对送到的队列名称
临时队列 我们之前所使用的非持久化的队列就是临时队列,在我们的rabbitMQ管理工具中我们可以看到hello队列即为临时队列
每当我们连接RabbitMQ时,我们都需要一个全新的空队列,为此我们可以创建你一个具有随机名称的队列 ,一旦我们断开了消费者连接,队列就会直接自动删除
创建临时队列的方法:
1 String queueName= channel.queueDeclare().getQueue();
我们可以看到这是所生成的临时队列
当我们断开连接后
绑定(bindings) 什么是绑定? binding其实就是exchange和queue之间的桥梁,他告诉我们 exchange和哪个队列进行了绑定关系,如下图,X与Q1和Q2产生了绑定关系
routingkey的作用 当 队列和交换机绑定了routingkey之后,消费者在将消息发送到交换机中时,传入了routingkey,交换机会拿着routingkey去将消息推送到找对应的队列
Fanout 介绍 Fanout类型很简单,它是将收到的所有消息广播他知道的所有队列中,系统中的默认就有fanout
消费者1和消费者2代码相似
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 package com.atguigu.rabbitmq.five;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogs01 { public static final String EXCHANGENAME="Logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(EXCHANGENAME,"fanout" ); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGENAME,"" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag,message)->{ System.out.println("01打印结果为:" +new String (message.getBody(),"UTF-8" )); }; CancelCallback cancelCallback=consumerTag->{}; channel.basicConsume(queueName,true ,deliverCallback,cancelCallback); } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.atguigu.rabbitmq.five;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;import static com.atguigu.rabbitmq.five.ReceiveLogs01.EXCHANGENAME;public class EmitLog { public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); Scanner sc=new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGENAME,"" ,null ,message.getBytes()); System.out.println("生产者发出消息" +message); } } }
交换机可将生产者的消息广播到所有队列中,再由消费者进行消费
Direct 根据routingkey的不同,不同的消息会被转发到不同的队列中
根据此交换机的绑定我们写出代码验证direct
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.atguigu.rabbitmq.six;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;import static com.atguigu.rabbitmq.six.ReceiveLogsDirect01.EXCHANGE_NAME;public class DirectLogs { public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); Scanner sc=new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGE_NAME,"error" ,null ,message.getBytes()); System.out.println("生产者发出消息" +message); } } }
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.atguigu.rabbitmq.six;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME="direct_Logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console" ,false ,false ,false ,null ); channel.queueBind("console" ,EXCHANGE_NAME,"info" ); channel.queueBind("console" ,EXCHANGE_NAME,"warning" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println("01打印结果为:" +new String (message.getBody(),"UTF-8" )); }; CancelCallback cancelCallback= consumerTag->{}; channel.basicConsume("console" ,deliverCallback,cancelCallback); } }
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.atguigu.rabbitmq.six;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME="direct_Logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk" ,false ,false ,false ,null ); channel.queueBind("disk" ,EXCHANGE_NAME,"error" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println("02打印结果为:" +new String (message.getBody(),"UTF-8" )); }; CancelCallback cancelCallback= consumerTag->{}; channel.basicConsume("disk" ,deliverCallback,cancelCallback); } }
生产者的routingkey是error所以消费者2消息会转发到对垒disk中
由消费者2 消费消息
当routingkey换为info时
Topic Topic的要求 发送到类型是topic交换机的消息的routingkey不能随便写,必须满足一定的要求,它必须是一个单词列表,以点号隔开。这些单词可以是任意单词。比如说”stock.usd.nyse”,”nyse,vmw”等等这种类型,当然这些单词列表最多不能超过255个字节
在此规则中,有两个替换符需要大家注意:
*(星号)可以代替一个单词
#(井号)可以代替零个或多个单词
Topic匹配案例 类似于模糊查询例如:
Q1 绑定的时中间带orange的三个单词的字符串
Q2绑定的是
最后一个单词是rabbit的三个单词字符串(. .rabbit)
第一个但此时lazy的多个单词字符串(lazy.#)
如下图这种情况routingkey会匹配到两个队列上,当队列多重绑定到exchange时,符合其中的一条规则,该队列就会收到exchange转发过来的信息
生产者 的 routingkey 为 132.123.hard
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.atguigu.rabbitmq.six;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.Channel;import java.util.Scanner;import static com.atguigu.rabbitmq.six.ReceiveLogsDirect01.EXCHANGE_NAME;public class DirectLogs { public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); Scanner sc=new Scanner (System.in); while (sc.hasNext()){ String message = sc.next(); channel.basicPublish(EXCHANGE_NAME,"132.123.hard" ,null ,message.getBytes()); System.out.println("生产者发出消息" +message); } } }
消费者1的 消息队列routingkey匹配规则为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 package com.atguigu.rabbitmq.six;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect01 { public static final String EXCHANGE_NAME="Topic_Logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("asd" ,false ,false ,true ,null ); channel.queueBind("asd" ,EXCHANGE_NAME,"*.*.easy" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println("01打印结果为:" +new String (message.getBody(),"UTF-8" )); }; CancelCallback cancelCallback= consumerTag->{}; channel.basicConsume("asd" ,deliverCallback,cancelCallback); } }
消费者2的消息队列routingkey匹配规则为
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 package com.atguigu.rabbitmq.six;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class ReceiveLogsDirect02 { public static final String EXCHANGE_NAME="Topic_Logs" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("qwe" ,false ,false ,true ,null ); channel.queueBind("qwe" ,EXCHANGE_NAME,"rabbitmq.#" ); channel.queueBind("qwe" ,EXCHANGE_NAME,"*.*.hard" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println("02打印结果为:" +new String (message.getBody(),"UTF-8" )); }; CancelCallback cancelCallback= consumerTag->{}; channel.basicConsume("qwe" ,deliverCallback,cancelCallback); } }
死信队列 概念 死信队列,顾名思义就是无法被消费的信息,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息并进行消费,但是某些时候由于特殊原因,导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信自然就有了有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入到死信队列中;用户在商城下单成功后并取点击支付后,在指定期间未支付时的自动失效
来源
消息TTL过期
消息队列达到最大长度(队列满了,无法再添加数据到mq中)
消息被拒绝(basic.reject或basic.nack)并且requeue=false
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 package com.atguigu.rabbitmq.seven;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;import java.util.HashMap;import java.util.Map;import static com.atguigu.rabbitmq.seven.consumer02.DEADEXCHANGE;import static com.atguigu.rabbitmq.seven.consumer02.DEADQUEUE;public class consumer01 { public static final String NORMALQUEUE="normal_queue" ; public static final String NORMALEXCHANGE="normal_exchange" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(NORMALEXCHANGE, BuiltinExchangeType.DIRECT); Map<String,Object> arguments=new HashMap <>(); arguments.put("x-dead-letter-exchange" ,DEADEXCHANGE); arguments.put("x-dead-letter-routing-key" ,"lisi" ); channel.queueDeclare(NORMALQUEUE,false ,false ,false ,arguments); channel.queueBind(NORMALQUEUE,NORMALEXCHANGE,"zhangsan" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag,message)->{ System.out.println("Consumer1接收的消息为:" +new String (message.getBody())); }; CancelCallback cancelCallback=consumerTag -> {}; channel.basicConsume(NORMALQUEUE,deliverCallback,cancelCallback); } }
消费者2(死信消费者)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 package com.atguigu.rabbitmq.seven;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.BuiltinExchangeType;import com.rabbitmq.client.CancelCallback;import com.rabbitmq.client.Channel;import com.rabbitmq.client.DeliverCallback;public class consumer02 { public static final String DEADQUEUE="dead_queue" ; public static final String DEADEXCHANGE="dead_exchange" ; public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); channel.exchangeDeclare(DEADEXCHANGE, BuiltinExchangeType.DIRECT); channel.queueDeclare(DEADQUEUE,false ,false ,false ,null ); channel.queueBind(DEADQUEUE,DEADEXCHANGE,"lisi" ); System.out.println("等待接收消息" ); DeliverCallback deliverCallback=(consumerTag, message)->{ System.out.println("Consumer2收的消息为:" +new String (message.getBody())); }; CancelCallback cancelCallback= consumerTag -> {}; channel.basicConsume(DEADQUEUE,deliverCallback,cancelCallback); } }
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.atguigu.rabbitmq.seven;import com.atguigu.rabbitmq.utils.RabbitMqUtils;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import java.util.Scanner;import static com.atguigu.rabbitmq.seven.consumer01.NORMALEXCHANGE;public class Producer { public static void main (String[] args) throws Exception { Channel channel = RabbitMqUtils.gerChannel(); AMQP.BasicProperties basicProperties= new AMQP .BasicProperties() .builder().expiration("10000" ).build(); for (int i=0 ;i<10 ;i++){ String message= "info" +i; channel.basicPublish(NORMALEXCHANGE,"zhangsan" ,basicProperties,message.getBytes()); } } }
关于普通队列的一些基本配置
arguments
x-message-ttl 发送到队列的消息在丢弃之前可以存活多长时间(毫秒)。
x-expires 队列在被自动删除(毫秒)之前可以使用多长时间。
x-max-length 队列在开始从头部删除之前可以包含多少就绪消息。
x-max-length-bytes 队列在开始从头部删除之前可以包含的就绪消息的总体大小。
x-dead-letter-exchange 设置队列溢出行为。这决定了在达到队列的最大长度时消息会发生什么。有效值为drop-head或reject-publish。交换的可选名称,如果消息被拒绝或过期,将重新发布这些名称。
x-dead-letter-routing-key 可选的替换路由密钥,用于在消息以字母为单位时使用。如果未设置,将使用消息的原始路由密钥。
x-max-priority 队列支持的最大优先级数;如果未设置,队列将不支持消息优先级。
x-queue-mode 将队列设置为延迟模式,在磁盘上保留尽可能多的消息以减少内存使用;如果未设置,队列将保留内存缓存以尽快传递消息。
x-queue-master-locator 将队列设置为主位置模式,确定在节点集群上声明时队列主机所在的规则。
延迟队列 概念 延迟队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素队列
所用场景
订单在十分钟之内未支付则自动取消
新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
用户注册成功后,如果三天内没有登录则进行短信提醒
用户发起退款,如果三天内没有得到处理,则会通知相关运营人员
预定会议后,需要在预定时间的前十分钟通知各个参与人员参加会议
这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看似使用了定时任务,一直轮训数据,每秒见哈一次,取出需要被处理的数据,然后处理就可以了。如果任务量较少则可以,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求如果对于时间不是严格限制,而是宽松上的之后,那么每天晚上跑个定时任务检查一下所有未支付的账单,这确实是一个可行的方案,但是对于数据量大,时效性比较强的场景,例如:“订单十分钟内未支付则关闭”,短期内支付的订单数量可能有很多,活动期间甚至会达到百万甚至千万级别,对于这么庞大的数据量仍然使用鲁迅的方式是不可娶的,很有可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大的压力,无法满足业务要求,而且性能低下
SpringBoot整合RabbitMQ 依赖导入
1 2 3 4 <dependency > <groupId > org.springframework.boot</groupId > <artifactId > spring-boot-starter-amqp</artifactId > </dependency >
添加配置
1 2 3 4 5 6 7 spring: rabbitmq: host: 192.168 .XXX.XXX username: XXX password: XXX port: 5672
队列TTL 代码架构图 创建两个队列QA和QB,二者队列TTL分别设置为10S和40S,然后创建一个交换机X和死信交换机Y,他们的类型都是direct,创建一个死信队列QD他们的绑定关系如下
我们要实现以上架构图的代码
首先添加配置类
配置类中 我们定义了队列A和队列B,并且将他们的死信队列指出,而且设置了死信队列(QD)的routingkey和死信交换机以及他的存活时长,设置了其队列和交换机之间的Routingkey,实际上所谓的延时队列,更像是利用MQ,将队列中的消息分别超时TTL,并最终放到死信队列中,我们可以把死信队列看成所谓的消息延时后所到达的队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 package com.example.demo.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class TTLQueueConfig { public static final String X_EXCHANGE="X" ; public static final String Y_DEAD_LETTER_EXCHANGE="Y" ; public static final String QUEUE_A="QA" ; public static final String QUEUE_B="QB" ; public static final String DEAD_LETTER_QUEUE_D="QD" ; @Bean public DirectExchange xExchange () { return new DirectExchange (X_EXCHANGE); } @Bean public DirectExchange yExchange () { return new DirectExchange (Y_DEAD_LETTER_EXCHANGE); } @Bean public Queue queueA () { Map<String,Object> arguments=new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); arguments.put("x-message-ttl" ,10 *1000 ); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } @Bean public Queue queueB () { Map<String,Object> arguments=new HashMap <>(3 ); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); arguments.put("x-message-ttl" ,40 *1000 ); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } @Bean public Queue queueD () { return QueueBuilder.durable(DEAD_LETTER_QUEUE_D).build(); } @Bean public Binding queueABindingX (@Qualifier("queueA") Queue queueA, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueA).to(xExchange).with("XA" ); } @Bean public Binding queueBBindingX ( @Qualifier("queueB") Queue queueB, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueB).to(xExchange).with("XB" ); } @Bean public Binding queueDBindingY (@Qualifier("queueD") Queue queueD, @Qualifier("yExchange") DirectExchange yExchange) { return BindingBuilder.bind(queueD).to(yExchange).with("YD" ); } }
其次是业务接口
业务接口中,利用rabbitTemplate发送了两条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.Date;@RestController @RequestMapping("/ttl") @Slf4j public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{message}") public void sendMsg (@PathVariable String message) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}" ,new Date ().toString(),message); rabbitTemplate.convertAndSend("X" ,"XA" ,"消息来自TTL为10S的队列" +message); rabbitTemplate.convertAndSend("X" ,"XB" ,"消息来自TTL为40S的队列" +message); } }
最终是我们重要的消息消费者,也就是我们对延时队列的监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 package com.example.demo;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.util.Date;@Component @Slf4j public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD (Message message, Channel channel) throws Exception{ String msg=new String (message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}" ,new Date ().toString(),msg); } }
当访问时
10秒后
40秒后
这就是所谓的延时队列
当下有一需求,需要我们去添加一个队列,并且可以灵活的调整ttl,但我们不能一直创建队列,只创建一个灵活延时队列。所以在创建队列时,不能直接定义TTL,
配置类
1 2 3 4 5 6 7 8 9 10 11 12 @Bean public Queue queueC () { Map<String,Object> arguments=new HashMap <>(); arguments.put("x-dead-letter-exchange" ,Y_DEAD_LETTER_EXCHANGE); arguments.put("x-dead-letter-routing-key" ,"YD" ); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } @Bean public Binding QueueCBindingX (@Qualifier("queueC") Queue queueC, @Qualifier("xExchange") DirectExchange xExchange) { return BindingBuilder.bind(queueC).to(xExchange).with("XC" ); }
controller类提供用户接口
1 2 3 4 5 6 7 8 9 10 11 @GetMapping("/sendExpirationMsg/{message}/{ttlTime}") public void sendExpirationMsg (@PathVariable String message,@PathVariable String ttlTime) { log.info("当前时间{},发送了一个时长{}毫秒TTL信息给队列:{}" ,new Date ().toString(),ttlTime,message); int ttl=Integer.valueOf(ttlTime)*1000 ; String time = String.valueOf(ttl); rabbitTemplate.convertAndSend("X" , "XC" , message, msg -> { msg.getMessageProperties().setExpiration(time); return msg; }); }
基于插件的延时队列 安装 直接将插件复制到
1 cp 插件 /usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins
安装插件
1 rabbitmq-plugins enable rabbitmq_delayed_message_exchagne
我们安装完插件后,可以在RabbitMQ可视化管理页面中发现
工作流程
代码实现
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 package com.example.demo.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.CustomExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class delayedQueueConfig { public static final String DELAYED_QUEUE_NAME= "delayed.queue" ; public static final String DELAYED_EXCHANGE_NAME= "delayed.exchange" ; public static final String DELAYED_ROUTING_KEY="delayed.routingkey" ; @Bean public CustomExchange delayedExchange () { Map<String,Object> arguments=new HashMap <>(); arguments.put("x-delayed-type" ,"direct" ); return new CustomExchange (DELAYED_EXCHANGE_NAME,"x-delayed-message" ,true ,false ,arguments); } @Bean public Queue delayedQueue () { return new Queue (DELAYED_QUEUE_NAME); } @Bean public Binding delayedQueueBindExchange ( @Qualifier("delayedExchange") CustomExchange delayedExchange, @Qualifier("delayedQueue") Queue queue ) { return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs(); } }
controller层
1 2 3 4 5 6 7 8 @GetMapping("/sendDelayedMsg/{message}/{ttlTime}") public void sendDelayedMsg (@PathVariable String message,@PathVariable Integer ttlTime) { log.info("当前时间{},发送了一个时长{}毫秒TTL信息给队列:{}" ,new Date ().toString(),ttlTime,message); rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME,DELAYED_ROUTING_KEY,message,msg->{ msg.getMessageProperties().setDelay(ttlTime); return msg; }); }
消费者
1 2 3 4 5 @RabbitListener(queues = DELAYED_QUEUE_NAME) public void receiveDelayed (Message message,Channel channel) throws Exception{ String msg= new String (message.getBody()); log.info("当前时间:{},收到死信队列的消息:{}" ,new Date ().toString(),msg); }
发布确认(高级) 在生产环境中由于一些不明的原因,导致RabbitMQ重启,在RabbitMQ重启期间生产者消息投递失败,导致消息丢失,需要手动处理和恢复,于是,我们开始思考,如何才能进行RabbitMQ的消息的可靠投递呢?特别是在这样比较低端的情况,RabbitMQ集群不可用的时候,无法投递消息该如何处理呢?
首先我们应该知道确认回调和消息回退是消息发布确认的两个重要的机制
确认回调 RabbitMQ的ConfirmCallback是一种发布确认机制,用于让生产者知道消息是否成功投递到交换机。如果成功,会收到一个ack(确认)回调,如果失败,会收到一个nack(否认)回调。生产者可以根据回调的结果做出相应的处理,比如重发或者存储失败的消息,可以在发送消息时向其中加入CorrelationData,CorrelationData中可以加入唯一ID用于表示消息身份,还可以添加一个Future对象,用于异步等待发布确认的结果。
在生产者发送消息时(我们在CorrelationData传入了ID)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.example.demo.controller;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.PathVariable;import org.springframework.web.bind.annotation.RequestMapping;import org.springframework.web.bind.annotation.RestController;import java.util.UUID;import static com.example.demo.config.ConfirmConfig.*;@RestController @Slf4j @RequestMapping("/confirm") public class producerController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMessage/{message}") public void SendMessage (@PathVariable String message) { CorrelationData correlationData = new CorrelationData (); correlationData.setId(UUID.randomUUID().toString()); rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME, CONFIRM_ROUTING_KEY, message, correlationData); log.info("发送的消息内容为:{}" , message); } }
然后,在ConfirmCallback回调函数中,可以通过CorrelationData对象获取消息的ID和发布确认的结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 package com.example.demo.config;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.connection.CorrelationData;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component @Slf4j public class MycallBack implements RabbitTemplate .ConfirmCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init () { rabbitTemplate.setConfirmCallback(this ); } @Override public void confirm (CorrelationData correlationData, boolean ack, String cause) { String id=correlationData!=null ?correlationData.getId():"" ; if (ack==true ){ log.info("交换机已经收到ID为:{}的消息" ,id); } else { log.info("交换机还未收到ID为:{}的消息,由于:{}" ,id,cause); } } }
消息回退 消息回退是指当消息发送到交换机后,如果交换机无法将消息路由到队列,那么就会将消息返回给生产者。
要使用消息回退,你需要做以下几个步骤:
开启回退模式:publisher-returns=“true”
设置ReturnCallback接口,实现returnedMessage方法
设置Exchange处理消息的模式:rabbitTemplate.setMandatory(true)
这样,当消息无法路由到队列时,就可以在ReturnedMessage方法中处理回退的消息
备份交换机 备份交换机是RabbitMQ中交换机的“备胎”,当一个交换机收到一条无法路由的消息时,就会把它转发给备份交换机,由备份交换机来处理。
要使用备份交换机,你需要在声明交换机时指定一个alternate-exchange参数,值为备份交换机的名称。
通常,备份交换机的类型为Fanout,这样可以把所有消息都投递到与其绑定的队列中。
你可以在备份交换机下绑定一个队列,用来存储那些无法路由的消息。
备份交换机都是Fanout广播类型的交换机,正常的普通交换机如何将投递的消息发送给备份交换机呢?
在普通交换机的配置中,我们加入备份交换机的参数
1 ExchangeBuilder.withArgument("alternate-exchange" ,BACKUP_EXCHANGE_NAME)
配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 package com.example.demo.config;import org.springframework.amqp.core.*;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configuration public class ConfirmConfig { public static final String CONFIRM_EXCHANGE_NAME="confirm_exchange" ; public static final String CONFIRM_QUEUE_NAME="confirm_queue" ; public static final String CONFIRM_ROUTING_KEY="key1" ; public static final String BACKUP_EXCHANGE_NAME="backup_exchange" ; public static final String BACKUP_QUEUE_NAME="backup.queue" ; public static final String WARNING_QUEUE_NAME="warning.queue" ; @Bean public DirectExchange confirmExchange () { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME) .withArgument("alternate-exchange" ,BACKUP_EXCHANGE_NAME) .build(); } @Bean public Queue confirmQueue () { return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build(); } @Bean public FanoutExchange backupExchange () { return new FanoutExchange (BACKUP_EXCHANGE_NAME); } @Bean public Queue backupQueue () { return QueueBuilder.durable(BACKUP_QUEUE_NAME).build(); } @Bean public Queue warningQueue () { return QueueBuilder.durable(WARNING_QUEUE_NAME).build(); } @Bean public Binding BindingBackup ( @Qualifier("backupExchange") FanoutExchange backExchange, @Qualifier("backupQueue") Queue backupQueue ) { return BindingBuilder.bind(backupQueue).to(backExchange); } @Bean public Binding BindingWarning ( @Qualifier("backupExchange") FanoutExchange backupExchange, @Qualifier("warningQueue") Queue warningQueue) { return BindingBuilder.bind(warningQueue).to(backupExchange); } @Bean public Binding BindingConfirm ( @Qualifier("confirmExchange") DirectExchange directExchange, @Qualifier("confirmQueue") Queue confirmQueue) { return BindingBuilder.bind(confirmQueue).to(directExchange).with(CONFIRM_ROUTING_KEY); } }
消费者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 package com.example.demo.consumer;import com.rabbitmq.client.Channel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import static com.example.demo.config.ConfirmConfig.WARNING_QUEUE_NAME;@Component @Slf4j public class WarningConsumer { @RabbitListener(queues = WARNING_QUEUE_NAME) public void recevicedBackup (Message message, Channel channel) { String msg = new String (message.getBody()); log.error("发现不可路由消息:{}" ,msg); } }
Controller层: 在此,生产者发送消息时,routingKey发生错误,无法 发送到指定的队列中,因而要转发到备用交换机中
1 2 3 4 5 @GetMapping("/sendMessage/{message}") public void SendMessage (@PathVariable String message) { rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,CONFIRM_ROUTING_KEY+"1" ,message); log.info("发送的消息内容为:{}" , message); }
其他 幂等性 概念 用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生副作用,举个简单作用,用户购买商品后支付,支付扣款成功,但返回结果时,网络异常,这时钱已经扣除,用户再次点击按钮此时就会进行第二次扣款,返回结果成功,用户查询余额发现多扣钱了,流水也变成了两条,在以前的单体应用系统中,我们只需把数据操作放入事务中即可,发生错误立即回滚,但是再响应客户端的时候也有可能出现网络终端或者异常等等
消息重复消费 消费者在消费MQ中的消息时,MQ已经把消息发给消费者,消费者给MQ返回ACK时网络中断,故MQ未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已经消费了这条消息,该消费者消费了重复的消息
解决思路 MQ消费者的幂等性的解决一般使用全局ID或者写个唯一标识比如事件戳或者UUID后者订单消费者MQ中的消息也可以利用MQ的该ID进行判断,或者可按照自己规则生成一个全局唯一ID,每次消费时用该ID先判断该消息是否已经消费过了
优先级队列 使用场景:
在我们系统中有一个订单催付的场景,我们的客户在天猫下的订单,淘宝会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像苹果,小米这样大商家一年起码能给我们创造很大的利润,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用redis,来存放的定时轮询,大家都知道redis.只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用RabbitMQ进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级。
假如,我们一共有100万的订单消息,需要进行催单消费,而这100万的订单信息又是不同的催付时间。需要RabbitMQ进行消费的时候就需要用到RabbitMQ的优先级队列,通过对不同的订单进行设置优先级,使得优先级高的消息先被优先处理。
RabbitMQ的优先级大小最小至最大的数值是0255也就是说,数字越大,会优先被消费。不过一般设置的数值会在010之间【因为如果设置0-255,会考验服务器的硬件性能问题;如果你的服务器硬件性能好的话,可以随便设置,不太好,并且消息量又大的话,还是建议使用0-10的这个优先级范围】,这个有点像是线程的优先级设置。
队里、交换机 以及绑定配置,在队列中定义优先级最大数
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 package com.example.demo.config;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import java.util.HashMap;import java.util.Map;@Configuration public class priorityConfig { public static final String PRIORITY_QUEUE_NAME="priority_queue" ; public static final String PRIORITY_EXCHANGE_NAME="priority_exchange" ; public static final String PRIORITY_ROUTINGKEY="priority" ; @Bean public DirectExchange queuePriority () { return new DirectExchange (PRIORITY_EXCHANGE_NAME); } @Bean public Queue createPriorityQueue () { Map<String,Object> arguments=new HashMap <>(); arguments.put("x-max-priority" ,10 ); return new Queue (PRIORITY_QUEUE_NAME,true ,false ,false ,arguments); } @Bean public Binding bingPriority (@Qualifier("createPriorityQueue") Queue createPriorityQueue, @Qualifier("queuePriority") DirectExchange queuePriority) { return BindingBuilder.bind(createPriorityQueue).to(queuePriority).with(PRIORITY_ROUTINGKEY); } }
消息生产者发送消息(controller层) 在消息生产者定义消息的优先级
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 @GetMapping("/priority/sendMessage/{message}") public void priorityMeesageeSend (@PathVariable String message) { Map<String,Object> arguments=new HashMap <>(); for (int i=0 ;i<10 ;i++){ if (i==5 ){ rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE_NAME,PRIORITY_ROUTINGKEY,"我特殊我是优先级" ,msg->{ MessageProperties messageProperties = msg.getMessageProperties(); messageProperties.setPriority(5 ); return msg; }); } else { rabbitTemplate.convertAndSend(PRIORITY_EXCHANGE_NAME,PRIORITY_ROUTINGKEY,message,msg->{ MessageProperties messageProperties=msg.getMessageProperties(); messageProperties.setPriority(1 ); return msg; }); } } }
消息消费者 设置为手动应答,每隔1秒消费一条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 package com.example.demo.consumer;import com.rabbitmq.client.AMQP;import com.rabbitmq.client.Channel;import com.rabbitmq.client.impl.AMQChannel;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.stereotype.Component;import java.io.IOException;import static com.example.demo.config.priorityConfig.PRIORITY_QUEUE_NAME;@Component @Slf4j public class Consumer { @RabbitListener(queues = PRIORITY_QUEUE_NAME,ackMode = "MANUAL") public void consumer (Message message,Channel channel) throws IOException { try { Thread.sleep(1000 ); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(message.getMessageProperties().getDeliveryTag(),false ); log.info("消费者收到消息:{}" ,new String (message.getBody())); } }
惰性队列 RabbitMQ从3.6.0版本开始引入了惰性队列的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到RabbitMQ的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当RabbitMQ需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然RabbitMQ的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。
可以在声明队列时加入
1 2 3 Map<String,Object> arguments=new HashMap <>(); arguments.put("x-queue-mode" ,"lazy" ); return new Queue (PRIORITY_QUEUE_NAME,true ,false ,false ,arguments);
常见问题 RabbitMQ是如何实现不同用户之间的数据隔离的呢?
虚拟主机(Virtual Hosts) : RabbitMQ 允许创建多个虚拟主机,每个虚拟主机都是一个独立的消息代理,拥有自己的交换机、队列、绑定和权限设置。不同用户可以被分配到不同的虚拟主机,从而实现数据隔离。
权限控制 : RabbitMQ 允许对每个虚拟主机进行细粒度的权限控制。管理员可以配置不同用户对交换机、队列、绑定的访问权限,包括读取、写入、管理等操作权限,确保只有授权的用户能够访问特定的资源。