1、MQ简介
MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据机构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。(来源:百度百科)
1.1、实现
消息队列常常保存在链表结构中,拥有权限的进程可以向消息队列中写入或读取消息。
当前使用较多的消息队列有:RebbitMQ
、RocketMQ
、ActiveMQ
、kafka
、ZeroMQ
、MetaMq
等。而部分数据库也具有消息队列功能的,例如:Redis
、Mysql
、以及phxsql
。
1.2、特点
MQ是一种消费者和生产者形式。生产者往消息队列中不断的写入新的消息数据,消费者从消息队列中获取消息数据。MQ和JMS类似,但不同的是JMS是Sun Java 消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
AMQP:
即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息 队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中 间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。
JMS:
Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中 间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 Java消息服务 是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分 都实现了JMS API,如 ActiveMQ , Redis 以及 RabbitMQ 等。
1.3、优缺点
优点
- 应用耦合
- 异步处理
- 流量削锋
- 传统模式:并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
- 中间件模式:按照数据库能处理的并发量,慢慢的一批一批的从消息队列中拉去消息数据。在生产中,这个短暂的高峰期积压是允许的。
缺点
系统可用性降低,系统复杂性增加
1.3、使用场景
消息队列,是分布式系统中的重要组成部件,其通用的使用场景可以简单的描述为:当不需要立即获得结果,但是并发量有需要进行控制的使用,差不多就是需要消息队列的时候。
例如场景:注册账号,注册完毕后发送短信到用户填写的手机号上(非必须)
原始:
- 前端发起注册请求
- 后端处理完毕注册请求,并保存注册的账号数据信息
- 像用户注册填写的手机号发送注册成功消息。(非必须)
- 返回注册结果到前端
这样有一个问题,就是前端注册要等待后端所有的处理完毕,才能收到注册成功的消息,但是对于前端用户来说,他需要的只是注册账号,发送短信什么的就不需要了,而后端还需要在发送短信后才返回结果给前台,这样的话就降低了用户的使用好感度,短信对用户来说是非必须的。如果在发送短信的时候出现了问题,那么前端用户就讲收不到正确的结果。既然发送短信是非必须的,那么就可以开一个多线程进行操作,但是这只是打个比方,实际开发中,多线程是一个很麻烦的问题。处理不当就会出现大问题,所以不推荐使用多线程,当然,如果技术很牛逼,那就另说了。
使用消息队列
- 前端发起注册请求
- 后端处理完毕注册请求,并保存注册的账号数据信息
- 把短信写入到消息队列中,消费者从消息队列中获取数据,然后发送到指定手机号(异步处理)
- 返回注册结果到前端
消息队列是异步的,所以在保存完注册的账号数据后可以直接返回结果。
2、RabbitMQ
2.1、为什么使用RabbitMQ
AMQP是一种协议,协议肯定不能进行实际的处理,RabbitMQ就是一个实现了AMQP协议的工具,通过RabbitMQ进行消息队列的使用。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: Python 、 Ruby 、 .NET 、 Java 、 JMS 、 C 、 PHP 、 ActionScript 、 XMPP 、 STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
为什么使用RabbitMQ:
- 基于AMQP协议
- 高并发(是一个容量的概念,服务器可以接受的大任务数量)
- 高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)
- 高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)
- 强大的社区支持,以及很多公司都在使用
- 支持插件
- 支持多语言
2.2、安装
2.2.1、安装Erlang
RabbitMQ依赖于Erlang,所以需要先安装Erlang。
官网:https://www.erlang.org/
2.2.1.1、Windows下安装
略。。。
2.2.1.2、Linux下安装
系统:Centos7
官网安装说明:https://www.rabbitmq.com/install-rpm.html
1、修改yum源,新建rabbitmq-erlang.repo
文件,指定安装erlang的版本
1
| vim /etc/yum.repos.d/rabbitmq-erlang.repo
|
添加下面的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| [rabbitmq_erlang] name=rabbitmq_erlang baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch repo_gpgcheck=1 gpgcheck=0 enabled=1 gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300
[rabbitmq_erlang-source] name=rabbitmq_erlang-source baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS repo_gpgcheck=1 gpgcheck=0 enabled=1 gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey sslverify=1 sslcacert=/etc/pki/tls/certs/ca-bundle.crt metadata_expire=300
|
2、清除yum缓存,重新创建缓存
1 2
| yum clean all yum makecache
|
3、安装
4、检测是否安装成功
2.2.2、安装RabbitMQ
官网安装说明:https://www.rabbitmq.com/install-rpm.html
Erlang和RabbitMQ版本对应:https://www.rabbitmq.com/which-erlang.html
Github上的RabbitMQ的rpm下载地址:https://github.com/rabbitmq/rabbitmq-server/releases
1、将下载的rpm上传到Centos
2、安装RabbitMQ
1
| yum -y install rabbitmq-server-3.7.12-1.el7.noarch.rpm
|
3、安装UI查询
1
| rabbitmq-plugins enable rabbitmq_management
|
4、启动RebbitMQ服务
1
| systemctl start rabbitmq-server.service
|
5、检测服务
Centos7版本的检测服务命令
1
| systemctl status rabbitmq-server.service
|
6、修改访问权限
不修改访问权限的话,只能本机访问,我的是虚拟机,我需要在物理机访问,就需要进行设置
1
| vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.12/ebin/rabbit.app
|
7、重启服务
1
| systemctl restart rabbitmq-server.service
|
8、物理机访问
默认端口:15672
2.3、RabbitMQ的名词
Producing:Producing意思不仅仅是发送消息。发送消息的程序叫做producer生产者。
Queue: Queue是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程 序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它 本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。
Consuming:Consuming 的意思和接收类似。等待接收消息的程序叫做消费者。
3、RabbitMQ的7种队列模式使用
POM依赖
1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.9.0</version> </dependency>
|
3.1、简单模式队列
1个生产者,1个消费者
处理模型:
- p:生产者
- C:消费者
- 中间件:Queue,消息缓存区
生产者-发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package cn.yanghuisen.simple.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
|
消费者-接收消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.simple.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
缺点:如果任务量很大,消息得不到及时的消费会造成队列积压。问题很严重,比如内存溢出,消息丢失等。
解决:配置多个消费者。
总结:简单队列模式,消息处理不及时,吞吐量较低。
3.2、工作模式队列(Work Queues)
3.2.1、消息轮询分发(Round-robin)
轮询分发:依次交替,一个消费者消费一个,依次往复
处理模型
工作模式队列-轮训-消息发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package cn.yanghuisen.work.rr.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0;i<20;i++){ String message = "Hello World!--"+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } }
|
工作模式队列-轮训-消息接收者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| package cn.yanghuisen.work.rr.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv01 {
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
工作模式队列-轮训-消息接收者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package cn.yanghuisen.work.rr.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv02 {
private final static String QUEUE_NAME = "work_rr";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
缺点:任务量很大,虽然解决了及时消费,单位时间内消息处理速度加快,提高了吞吐量,可 是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。
解决:采用消息公平分发
总结:及时消费,处理速度加快,提高吞吐量,部分资源浪费
3.2.2、消息公平分发(fair dispatch)
消费速度快的多处理点
处理模型
工作模式队列-公平-消息发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package cn.yanghuisen.work.fair.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, false, false, false, null); for (int i = 0;i<20;i++){ String message = "Hello World!--"+i; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } } }
|
工作模式队列-公平-消息接收者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package cn.yanghuisen.work.fair.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv01 {
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1; channel.basicQos(prefetchCount);
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package cn.yanghuisen.work.fair.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv02 {
private final static String QUEUE_NAME = "work_fair";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
int prefetchCount = 1; channel.basicQos(prefetchCount); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); };
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
|
问题:让固定的消费者消费,类似于微信订阅的公众号,只有订阅了指定的公众号才能收到指定公众号推送的内容
解决:采用发布/订阅模式
总结:消息处理速度不同,收到的消息也不同,消费速度快的处理的数量比较多,最大化的使用计算机资源。
3.3、发布/订阅模式队列(Publish/Subscribe)
类似于微信公众号,只有订阅了指定公众号的微信才能收到对应的消息
处理模型:
发布/订阅模式-消息发送者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.yanghuisen.fanout.send;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send { private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "Hello World!"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.yanghuisen.fanout.recv;
import com.rabbitmq.client.*;
public class Recv01 {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.yanghuisen.fanout.recv;
import com.rabbitmq.client.*;
public class Recv02 {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
问题:指定消费者消费?类似于微信公众号的付费文章
解决:采用direct路由模式。
总结:一条消息,可以让多个消费者消费
3.4、路由模式队列(Routing)
指定消费者消费,类似于微信公众号的付费文章
模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package cn.yanghuisen.direct.send;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Send { private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
String vipEssay = "VIP文章"; String xmEssay = "限免文章"; String essay = "普通文章";
channel.basicPublish(EXCHANGE_NAME, "vipEssay", null, vipEssay.getBytes("UTF-8")); channel.basicPublish(EXCHANGE_NAME, "xmEssay", null, xmEssay.getBytes("UTF-8")); channel.basicPublish(EXCHANGE_NAME, "essay", null, essay.getBytes("UTF-8")); System.out.println(" [x] Sent '" + vipEssay + "'"); System.out.println(" [x] Sent '" + xmEssay + "'"); System.out.println(" [x] Sent '" + essay + "'"); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package cn.yanghuisen.direct.recv;
import com.rabbitmq.client.*;
public class Recv01 {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"essay"); channel.queueBind(queueName,EXCHANGE_NAME,"xmEssay"); channel.queueBind(queueName,EXCHANGE_NAME,"vipEssay"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.direct.recv;
import com.rabbitmq.client.*;
public class Recv02 {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"essay"); channel.queueBind(queueName,EXCHANGE_NAME,"xmEssay"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
问题:生产者产生的消息如果厂家需求过多需要设置很多路由?
解决:采用topic主题模式
总结:生产者发送了多天设置了路由规则的消息,消费者可以根据具体的路由规则消息对应的队列中的消息。
3.5、主题模式队列(Topics)
使用指定规则匹配路由
模型
以模型图为例
- routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,
- routingKey=”lazy.orange.fox”的消息会路由到Q1,Q2,
- routingKey=”lazy.brown.fox”的消息会路由到Q2,
- routingKey=”lazy.pink.rabbit”的消息会路由到Q2;
- routingKey=”quick.brown.fox”; routingKey=”orange”;routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何 bindingKey。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| package cn.yanghuisen.topic.send;
import com.rabbitmq.client.BuiltinExchangeType; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class Send { private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String vipEssay = "VIP文章"; String xmEssay = "限免文章"; String essay = "普通文章";
channel.basicPublish(EXCHANGE_NAME, "a.b.c", null, vipEssay.getBytes("UTF-8")); channel.basicPublish(EXCHANGE_NAME, "d.b.e.f.c", null, xmEssay.getBytes("UTF-8")); channel.basicPublish(EXCHANGE_NAME, "e.f", null, essay.getBytes("UTF-8")); System.out.println(" [x] Sent '" + vipEssay + "'"); System.out.println(" [x] Sent '" + xmEssay + "'"); System.out.println(" [x] Sent '" + essay + "'"); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.topic.recv;
import com.rabbitmq.client.*;
public class Recv01 {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"#.f.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package cn.yanghuisen.topic.recv;
import com.rabbitmq.client.*;
public class Recv02 {
private static final String EXCHANGE_NAME = "exchange_topic";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName,EXCHANGE_NAME,"*.b.*"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
|
问题:RabbitMQ本身是基于异步的消息处理,是否可以同步实现?
解决:采用RPC模式
总结:根据指定规则匹配不同的路由
3.6、RPC远程过程调用模式队列(RPC)
模型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
| package cn.yanghuisen.rpc.server;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RPCServer {
private static final String RPC_QUEUE_NAME = "rpc_queue";
private static int fib(int n) { if (n == 0) return 0; if (n == 1) return 1; return fib(n - 1) + fib(n - 2); }
public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
try { final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null); channel.queuePurge(RPC_QUEUE_NAME);
int prefetchCount = 1; channel.basicQos(prefetchCount);
System.out.println(" [x] Awaiting RPC requests");
Object monitor = new Object(); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build();
String response = ""; try { String message = new String(delivery.getBody(), "UTF-8"); int n = Integer.parseInt(message);
System.out.println(" [.] fib(" + message + ")"); response += fib(n); } catch (RuntimeException e) { System.out.println(" [.] " + e.toString()); } finally { channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); synchronized (monitor) { monitor.notify(); } } };
boolean autoAck = false; channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { }); while (true) { synchronized (monitor) { try { monitor.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| package cn.yanghuisen.rpc.client;
import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeoutException;
public class RPCClient implements AutoCloseable {
private Connection connection; private Channel channel; private String requestQueueName = "rpc_queue";
public RPCClient() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
connection = factory.newConnection(); channel = connection.createChannel(); }
public static void main(String[] args) { try (RPCClient fibonacciRpc = new RPCClient()) { for (int i = 0; i < 10; i++) { String i_str = Integer.toString(i); System.out.println(" [x] Requesting fib(" + i_str + ")"); String response = fibonacciRpc.call(i_str); System.out.println(" [.] Got '" + response + "'"); } } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); } }
public String call(String message) throws IOException, InterruptedException { final String corrId = UUID.randomUUID().toString();
String replyQueueName = channel.queueDeclare().getQueue();
AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build();
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { response.offer(new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { });
String result = response.take(); channel.basicCancel(ctag); return result; }
public void close() throws IOException { connection.close(); } }
|
3.7、确认模式队列(confirm)
如何确定消息队列收到了生产者发送的消息?如果在发送消息前程序崩了怎么办?
3.7.1、事务机制控制
txSelect():开启事务
txCommit():提交事务
txRollback():回滚事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| package cn.yanghuisen.tx.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "tx";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
Connection connection = null; Channel channel = null;
try{ connection = factory.newConnection(); channel = connection.createChannel(); channel.txSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); int i = 1/0; System.out.println(" [x] Sent '" + message + "'"); channel.txCommit(); }catch (Exception e){ e.printStackTrace(); channel.txRollback(); channel.close(); connection.close();
} } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.tx.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "tx";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
缺点:降低了RabbitMQ的消息吞吐量
解决:使用confirm模式
总结:使用事务,可以在发送请求但是没有提交事务前回滚事务,撤回发送的消息。
3.7.2、确认模式(confirm)
生产者设置为确认模式,发送消息时所有的消息都会被指派一个唯一的ID,一旦消息被投递套指定的队列之后,就会返回一个确认结果给生产者(包含消息的唯一ID),这样生产者就知道了消息已经正确到达了目的地。如果消息和队列时可以持久化的,那么确认消息会将消息写入磁盘后发出。
confirm模式大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回 确认的同时继续发送下一条消息,当消息终得到确认之后,生产者应用便可以通过回调方法来处理该 确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序 同样可以在回调方法中处理该nack消息。
实现Confirm确认机制有三种方式
1、普通Confirm模式
每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package cn.yanghuisen.confirm.sync.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
Connection connection = null; Channel channel = null;
try{ connection = factory.newConnection(); channel = connection.createChannel(); channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
if (channel.waitForConfirms()){ System.out.println("消息发送成功"); }else { System.out.println("消息发送失败"); }
System.out.println(" [x] Sent '" + message + "'"); }catch (Exception e){ e.printStackTrace();
} } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.confirm.sync.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
2、批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端 confirm。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| package cn.yanghuisen.confirm.sync.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send {
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
Connection connection = null; Channel channel = null;
try{ connection = factory.newConnection(); channel = connection.createChannel(); channel.confirmSelect();
channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
channel.waitForConfirmsOrDie();
System.out.println(" [x] Sent '" + message + "'"); }catch (Exception e){ e.printStackTrace();
} } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.confirm.sync.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "confirm_sync";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| package cn.yanghuisen.confirm.async.send;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.ConfirmListener; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.Collections; import java.util.SortedSet; import java.util.TreeSet;
public class Send {
private final static String QUEUE_NAME = "confirm_async";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop");
Connection connection = null; Channel channel = null;
try{ final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>()); connection = factory.newConnection(); channel = connection.createChannel(); channel.confirmSelect(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.addConfirmListener(new ConfirmListener() {
@Override public void handleAck(long l, boolean b) throws IOException { if (b){ System.out.println("handleAck--success-->multiple" + l); confirmSet.headSet(l+1).clear(); }else { System.out.println("handleAck--success-->single" + l); confirmSet.remove(l); } }
@Override public void handleNack(long l, boolean b) throws IOException { if (b){ System.out.println("handleNack--failed-->multiple-->" + l); confirmSet.headSet(l + 1L).clear(); }else { System.out.println("handleNack--failed-->single" + l); confirmSet.remove(l); } } });
while (true){ String message = "Hello World!"; Long seqNo = channel.getNextPublishSeqNo(); channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); confirmSet.add(seqNo); } }catch (Exception e){ e.printStackTrace(); channel.close(); connection.close();
} } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package cn.yanghuisen.confirm.async.recv;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "confirm_async";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.10.100"); factory.setPort(5672); factory.setUsername("shop"); factory.setPassword("shop"); factory.setVirtualHost("/shop"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
4、Spring集成RabbitMQ
官网:https://spring.io/projects/spring-amqp
4.1、创建聚合项目
父pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <packaging>pom</packaging> <modules> <module>provider</module> <module>consumer</module> </modules> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.0.RELEASE</version> <relativePath/> </parent> <groupId>cn.yanghuisen</groupId> <artifactId>spring-rabbitmq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-rabbitmq</name> <description>Demo project for Spring Boot</description>
<properties> <java.version>1.8</java.version> </properties>
<dependencies>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies>
<build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build>
</project>
|
4.2、生产者
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-rabbitmq</artifactId> <groupId>cn.yanghuisen</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>provider</artifactId>
<name>provider</name> <url>http://www.example.com</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>
<dependencies>
</dependencies>
</project>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| package cn.yanghuisen;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class App { public static void main( String[] args ) { SpringApplication.run(App.class); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package cn.yanghuisen;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
@Bean public Queue queue(){ return new Queue("topic"); }
@Bean public TopicExchange topicExchange(){ return new TopicExchange("topicExchange"); }
@Bean public Binding binding(){ return BindingBuilder.bind(queue()).to(topicExchange()).with("*.msg.#"); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package cn.yanghuisen;
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component public class Send { @Resource private RabbitTemplate rabbitTemplate;
public void send(){ String message = "Hello World";
rabbitTemplate.convertAndSend("topicExchange","topic.msg",message); System.out.println("发送消息:"+message); } }
|
4.3、消费者
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| <?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>spring-rabbitmq</artifactId> <groupId>cn.yanghuisen</groupId> <version>0.0.1-SNAPSHOT</version> </parent> <modelVersion>4.0.0</modelVersion>
<artifactId>consumer</artifactId>
<name>consumer</name> <url>http://www.example.com</url>
<properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties>
<dependencies>
</dependencies>
<build>
</build> </project>
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| package cn.yanghuisen;
import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class App { public static void main( String[] args ) { SpringApplication.run(App.class); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| package cn.yanghuisen;
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @RabbitListener(queues = "topic") public class Consumer {
@RabbitHandler public void recv(String message){ System.out.println("接受消息:"+message); } }
|
4.4、测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| package cn.yanghuisen;
import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest;
import javax.annotation.Resource;
@SpringBootTest public class TestSend {
@Resource private Send send;
@Test public void testSend(){ send.send(); }
}
|