1、MQ简介

MQ(Message Queue)消息队列,是基础数据结构中“先进先出”的一种数据机构。指把要传输的数据(消息)放在队列中,用队列机制来实现消息传递——生产者产生消息并把消息放入队列,然后由消费者去处理。消费者可以到指定队列拉取消息,或者订阅相应的队列,由MQ服务端给其推送消息。(来源:百度百科)

1.1、实现

消息队列常常保存在链表结构中,拥有权限的进程可以向消息队列中写入或读取消息。

当前使用较多的消息队列有:RebbitMQRocketMQActiveMQkafkaZeroMQMetaMq等。而部分数据库也具有消息队列功能的,例如:RedisMysql、以及phxsql

1.2、特点

MQ是一种消费者和生产者形式。生产者往消息队列中不断的写入新的消息数据,消费者从消息队列中获取消息数据。MQ和JMS类似,但不同的是JMS是Sun Java 消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。

  • AMQP:

    即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息 队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中 间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

  • JMS:

    Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中 间件的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。 Java消息服务 是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。常见的消息队列,大部分 都实现了JMS API,如 ActiveMQ , Redis 以及 RabbitMQ 等。

1.3、优缺点

优点

  • 应用耦合
  • 异步处理
  • 流量削锋
    • 传统模式:并发量大的时候,所有的请求直接怼到数据库,造成数据库连接异常
    • 中间件模式:按照数据库能处理的并发量,慢慢的一批一批的从消息队列中拉去消息数据。在生产中,这个短暂的高峰期积压是允许的。

缺点

系统可用性降低,系统复杂性增加

1.3、使用场景

消息队列,是分布式系统中的重要组成部件,其通用的使用场景可以简单的描述为:当不需要立即获得结果,但是并发量有需要进行控制的使用,差不多就是需要消息队列的时候。

例如场景:注册账号,注册完毕后发送短信到用户填写的手机号上(非必须)

原始:

  1. 前端发起注册请求
  2. 后端处理完毕注册请求,并保存注册的账号数据信息
  3. 像用户注册填写的手机号发送注册成功消息。(非必须)
  4. 返回注册结果到前端

这样有一个问题,就是前端注册要等待后端所有的处理完毕,才能收到注册成功的消息,但是对于前端用户来说,他需要的只是注册账号,发送短信什么的就不需要了,而后端还需要在发送短信后才返回结果给前台,这样的话就降低了用户的使用好感度,短信对用户来说是非必须的。如果在发送短信的时候出现了问题,那么前端用户就讲收不到正确的结果。既然发送短信是非必须的,那么就可以开一个多线程进行操作,但是这只是打个比方,实际开发中,多线程是一个很麻烦的问题。处理不当就会出现大问题,所以不推荐使用多线程,当然,如果技术很牛逼,那就另说了。

使用消息队列

  1. 前端发起注册请求
  2. 后端处理完毕注册请求,并保存注册的账号数据信息
    • 把短信写入到消息队列中,消费者从消息队列中获取数据,然后发送到指定手机号(异步处理
  3. 返回注册结果到前端

消息队列是异步的,所以在保存完注册的账号数据后可以直接返回结果。


2、RabbitMQ

2.1、为什么使用RabbitMQ

AMQP是一种协议,协议肯定不能进行实际的处理,RabbitMQ就是一个实现了AMQP协议的工具,通过RabbitMQ进行消息队列的使用。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如: Python 、 Ruby 、 .NET 、 Java 、 JMS 、 C 、 PHP 、 ActionScript 、 XMPP 、 STOMP 等,支持 AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

为什么使用RabbitMQ:

  • 基于AMQP协议
  • 高并发(是一个容量的概念,服务器可以接受的大任务数量)
  • 高性能(是一个速度的概念,单位时间内服务器可以处理的任务数)
  • 高可用(是一个持久的概念,单位时间内服务器可以正常工作的时间比例)
  • 强大的社区支持,以及很多公司都在使用
  • 支持插件
  • 支持多语言

2.2、安装

2.2.1、安装Erlang

RabbitMQ依赖于Erlang,所以需要先安装Erlang。

官网:https://www.erlang.org/

2.2.1.1、Windows下安装

略。。。

2.2.1.2、Linux下安装

系统:Centos7

官网安装说明:https://www.rabbitmq.com/install-rpm.html

1、修改yum源,新建rabbitmq-erlang.repo文件,指定安装erlang的版本

1
vim /etc/yum.repos.d/rabbitmq-erlang.repo

添加下面的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[rabbitmq_erlang]
name=rabbitmq_erlang
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/$basearch
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

[rabbitmq_erlang-source]
name=rabbitmq_erlang-source
baseurl=https://packagecloud.io/rabbitmq/erlang/el/7/SRPMS
repo_gpgcheck=1
gpgcheck=0
enabled=1
gpgkey=https://packagecloud.io/rabbitmq/erlang/gpgkey
sslverify=1
sslcacert=/etc/pki/tls/certs/ca-bundle.crt
metadata_expire=300

2、清除yum缓存,重新创建缓存

1
2
yum clean all
yum makecache

3、安装

1
yum -y install erlang

4、检测是否安装成功

1
erl

在这里插入图片描述

2.2.2、安装RabbitMQ

官网安装说明:https://www.rabbitmq.com/install-rpm.html

Erlang和RabbitMQ版本对应:https://www.rabbitmq.com/which-erlang.html

Github上的RabbitMQ的rpm下载地址:https://github.com/rabbitmq/rabbitmq-server/releases

在这里插入图片描述

1、将下载的rpm上传到Centos

在这里插入图片描述

2、安装RabbitMQ

1
yum -y install rabbitmq-server-3.7.12-1.el7.noarch.rpm

3、安装UI查询

1
rabbitmq-plugins enable rabbitmq_management

4、启动RebbitMQ服务

1
systemctl start rabbitmq-server.service

5、检测服务

Centos7版本的检测服务命令

1
systemctl status rabbitmq-server.service

在这里插入图片描述

6、修改访问权限

不修改访问权限的话,只能本机访问,我的是虚拟机,我需要在物理机访问,就需要进行设置

1
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.12/ebin/rabbit.app

在这里插入图片描述

7、重启服务

1
systemctl restart rabbitmq-server.service

8、物理机访问

默认端口:15672

在这里插入图片描述在这里插入图片描述

2.3、RabbitMQ的名词

Producing:Producing意思不仅仅是发送消息。发送消息的程序叫做producer生产者。

Queue: Queue是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程 序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它 本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。

Consuming:Consuming 的意思和接收类似。等待接收消息的程序叫做消费者。

3、RabbitMQ的7种队列模式使用

POM依赖

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>

3.1、简单模式队列

1个生产者,1个消费者

处理模型:

  • p:生产者
  • C:消费者
  • 中间件:Queue,消息缓存区

生产者-发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.yanghuisen.simple.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 简单模式-生产者-发送消息
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

// 通过工厂创建连接
try (Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel()) {
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}

消费者-接收消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.simple.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 简单模式-消费者-接收消息
*/
public class Recv {

// 队列名称
private final static String QUEUE_NAME = "hello";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

缺点:如果任务量很大,消息得不到及时的消费会造成队列积压。问题很严重,比如内存溢出,消息丢失等。

解决:配置多个消费者。

总结:简单队列模式,消息处理不及时,吞吐量较低。

3.2、工作模式队列(Work Queues)

3.2.1、消息轮询分发(Round-robin)

轮询分发:依次交替,一个消费者消费一个,依次往复

处理模型

工作模式队列-轮训-消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.yanghuisen.work.rr.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 工作模式队列-轮训-消息发送者
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "work_rr";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

// 通过工厂创建连接
try (Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel()) {

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0;i<20;i++){
// 要发送的消息内容
String message = "Hello World!--"+i;
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}

工作模式队列-轮训-消息接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package cn.yanghuisen.work.rr.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 工作模式队列-轮训-消息接收者
*/
public class Recv01 {

private final static String QUEUE_NAME = "work_rr";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

工作模式队列-轮训-消息接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.yanghuisen.work.rr.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 工作模式队列-轮训-消息接收者
*/
public class Recv02 {

private final static String QUEUE_NAME = "work_rr";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟程序执行耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

缺点:任务量很大,虽然解决了及时消费,单位时间内消息处理速度加快,提高了吞吐量,可 是不同消费者处理消息的时间不同,导致部分消费者的资源被浪费。

解决:采用消息公平分发

总结:及时消费,处理速度加快,提高吞吐量,部分资源浪费

3.2.2、消息公平分发(fair dispatch)

消费速度快的多处理点

处理模型

工作模式队列-公平-消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package cn.yanghuisen.work.fair.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 工作模式队列-公平-消息发送者
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "work_fair";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

// 通过工厂创建连接
try (Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel()) {

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0;i<20;i++){
// 要发送的消息内容
String message = "Hello World!--"+i;
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
}

工作模式队列-公平-消息接收者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package cn.yanghuisen.work.fair.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 工作模式队列-公平-消息接收者
*/
public class Recv01 {

private final static String QUEUE_NAME = "work_fair";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 限制只发不超过1条的消息给同一个消费者,当消息处理完毕后,有了反馈,才会进行第二次发送
int prefetchCount = 1;
channel.basicQos(prefetchCount);



DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟程序运行耗时
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动回执(反馈)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

};

// 设置false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package cn.yanghuisen.work.fair.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 工作模式队列-公平-消息接收者
*/
public class Recv02 {

private final static String QUEUE_NAME = "work_fair";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 限制只发不超过1条的消息给同一个消费者,当消息处理完毕后,有了反馈,才会进行第二次发送
int prefetchCount = 1;
channel.basicQos(prefetchCount);
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
// 模拟程序执行耗时
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 手动回执(反馈)
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
};

// 设置false
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}

问题:让固定的消费者消费,类似于微信订阅的公众号,只有订阅了指定的公众号才能收到指定公众号推送的内容

解决:采用发布/订阅模式

总结:消息处理速度不同,收到的消息也不同,消费速度快的处理的数量比较多,最大化的使用计算机资源。

3.3、发布/订阅模式队列(Publish/Subscribe)

类似于微信公众号,只有订阅了指定公众号的微信才能收到对应的消息

处理模型:

发布/订阅模式-消息发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.yanghuisen.fanout.send;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 发布/订阅模式-消息发送者
*/
public class Send {
//交换机名称
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

// 通过工厂创建连接
try (Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel()) {
// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
String message = "Hello World!";
//将消息放入队列并发送
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.yanghuisen.fanout.recv;

import com.rabbitmq.client.*;

/**
* 发布/订阅模式-消息接收者
*/
public class Recv01 {

// 交换机名称
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机于队列
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.yanghuisen.fanout.recv;

import com.rabbitmq.client.*;

/**
* 发布/订阅模式-消息接收者
*/
public class Recv02 {

// 交换机名称
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机于队列
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

问题:指定消费者消费?类似于微信公众号的付费文章

解决:采用direct路由模式。

总结:一条消息,可以让多个消费者消费

3.4、路由模式队列(Routing)

指定消费者消费,类似于微信公众号的付费文章

模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.yanghuisen.direct.send;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* 路由模式-消息发送者
*/
public class Send {
//交换机名称
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

// 通过工厂创建连接
try (Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel()) {
// 申明路由交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

// 多个消息
String vipEssay = "VIP文章";
String xmEssay = "限免文章";
String essay = "普通文章";

//将消息放入队列并发送
channel.basicPublish(EXCHANGE_NAME, "vipEssay", null, vipEssay.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "xmEssay", null, xmEssay.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "essay", null, essay.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + vipEssay + "'");
System.out.println(" [x] Sent '" + xmEssay + "'");
System.out.println(" [x] Sent '" + essay + "'");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package cn.yanghuisen.direct.recv;

import com.rabbitmq.client.*;

/**
* 路由模式-消息接收者
*/
public class Recv01 {

// 交换机名称
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机于队列
channel.queueBind(queueName,EXCHANGE_NAME,"essay");
channel.queueBind(queueName,EXCHANGE_NAME,"xmEssay");
channel.queueBind(queueName,EXCHANGE_NAME,"vipEssay");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.direct.recv;

import com.rabbitmq.client.*;

/**
* 路由模式-消息接收者
*/
public class Recv02 {

// 交换机名称
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"essay");
channel.queueBind(queueName,EXCHANGE_NAME,"xmEssay");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

问题:生产者产生的消息如果厂家需求过多需要设置很多路由?

解决:采用topic主题模式

总结:生产者发送了多天设置了路由规则的消息,消费者可以根据具体的路由规则消息对应的队列中的消息。

3.5、主题模式队列(Topics)

使用指定规则匹配路由

模型

  • *:代替1个单词
  • #:代替0个或多个单词

以模型图为例

  • routingKey=”quick.orange.rabbit”的消息会同时路由到Q1与Q2,
  • routingKey=”lazy.orange.fox”的消息会路由到Q1,Q2,
  • routingKey=”lazy.brown.fox”的消息会路由到Q2,
  • routingKey=”lazy.pink.rabbit”的消息会路由到Q2;
  • routingKey=”quick.brown.fox”; routingKey=”orange”;routingKey=”quick.orange.male.rabbit”的消息将会被丢弃,因为它们没有匹配任何 bindingKey。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package cn.yanghuisen.topic.send;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
* 路由模式-消息发送者
*/
public class Send {
//交换机名称
private static final String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

// 通过工厂创建连接
try (Connection connection = factory.newConnection();
// 获取通道
Channel channel = connection.createChannel()) {
// 申明主题交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

// 多个消息
String vipEssay = "VIP文章";
String xmEssay = "限免文章";
String essay = "普通文章";

//将消息放入队列并发送
channel.basicPublish(EXCHANGE_NAME, "a.b.c", null, vipEssay.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "d.b.e.f.c", null, xmEssay.getBytes("UTF-8"));
channel.basicPublish(EXCHANGE_NAME, "e.f", null, essay.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + vipEssay + "'");
System.out.println(" [x] Sent '" + xmEssay + "'");
System.out.println(" [x] Sent '" + essay + "'");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.topic.recv;

import com.rabbitmq.client.*;

/**
* 路由模式-消息接收者
*/
public class Recv01 {

// 交换机名称
private static final String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机于队列
channel.queueBind(queueName,EXCHANGE_NAME,"#.f.#");

System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package cn.yanghuisen.topic.recv;

import com.rabbitmq.client.*;

/**
* 路由模式-消息接收者
*/
public class Recv02 {

// 交换机名称
private static final String EXCHANGE_NAME = "exchange_topic";

public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 申明交换机
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
// 获取队列名称
String queueName = channel.queueDeclare().getQueue();
// 绑定交换机和队列
channel.queueBind(queueName,EXCHANGE_NAME,"*.b.*");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
}
}

问题:RabbitMQ本身是基于异步的消息处理,是否可以同步实现?

解决:采用RPC模式

总结:根据指定规则匹配不同的路由

3.6、RPC远程过程调用模式队列(RPC)

模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package cn.yanghuisen.rpc.server;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* RPC模式队列-服务端
*/
public class RPCServer {

// 队列名称
private static final String RPC_QUEUE_NAME = "rpc_queue";

/**
* 计算斐波那契数列
*
* @param n
* @return
*/
private static int fib(int n) {
if (n == 0) return 0;
if (n == 1) return 1;
return fib(n - 1) + fib(n - 2);
}

public static void main(String[] args) {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

try {
// 通过工厂创建连接
final Connection connection = factory.newConnection();
// 获取通道
final Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.queuePurge(RPC_QUEUE_NAME);

/*
限制RabbitMQ只发不超过1条的消息给同一个消费者。
当消息处理完毕后,有了反馈,才会进行第二次发送。
*/
int prefetchCount = 1;
channel.basicQos(prefetchCount);

System.out.println(" [x] Awaiting RPC requests");

Object monitor = new Object();
// 获取消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
// 获取replyTo队列和correlationId请求标识
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(delivery.getProperties().getCorrelationId())
.build();

String response = "";
try {
// 接收客户端消息
String message = new String(delivery.getBody(), "UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
// 服务端根据业务需求处理
response += fib(n);
} catch (RuntimeException e) {
System.out.println(" [.] " + e.toString());
} finally {
// 将处理结果发送至replyTo队列同时携带correlationId属性
channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps,
response.getBytes("UTF-8"));
// 手动回执消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
// RabbitMq消费者工作线程通知RPC服务器其他所有线程运行
synchronized (monitor) {
monitor.notify();
}
}
};
// 监听队列
/*
autoAck = true代表自动确认消息
autoAck = false代表手动确认消息
*/
boolean autoAck = false;
channel.basicConsume(RPC_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
});
// Wait and be prepared to consume the message from RPC client.
// 线程等待并准备接收来自RPC客户端的消息
while (true) {
synchronized (monitor) {
try {
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
package cn.yanghuisen.rpc.client;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

/**
* RPC模式队列-客户端
*/
public class RPCClient implements AutoCloseable {

private Connection connection;
private Channel channel;
// 队列名称
private String requestQueueName = "rpc_queue";

// 初始化连接
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

connection = factory.newConnection();
channel = connection.createChannel();
}

public static void main(String[] args) {
try (RPCClient fibonacciRpc = new RPCClient()) {
for (int i = 0; i < 10; i++) {
String i_str = Integer.toString(i);
System.out.println(" [x] Requesting fib(" + i_str + ")");
// 请求服务端
String response = fibonacciRpc.call(i_str);
System.out.println(" [.] Got '" + response + "'");
}
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
}

// 请求服务端
public String call(String message) throws IOException, InterruptedException {
// correlationId请求标识ID
final String corrId = UUID.randomUUID().toString();

// 获取队列名称
String replyQueueName = channel.queueDeclare().getQueue();

// 设置replyTo队列和correlationId请求标识
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

// 发送消息至队列
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

// 设置线程等待,每次只接收一个响应结果
final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

// 接受服务器返回结果
String ctag = channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> {
if (delivery.getProperties().getCorrelationId().equals(corrId)) {
// 将给定的元素在给定的时间内设置到线程队列中,如果设置成功返回true, 否则返回false
response.offer(new String(delivery.getBody(), "UTF-8"));
}
}, consumerTag -> {
});

// 从线程队列中获取值,如果线程队列中没有值,线程会一直阻塞,直到线程队列中有值,并且取得该值
String result = response.take();
// 从消息队列中丢弃该值
channel.basicCancel(ctag);
return result;
}

// 关闭连接
public void close() throws IOException {
connection.close();
}
}

3.7、确认模式队列(confirm)

如何确定消息队列收到了生产者发送的消息?如果在发送消息前程序崩了怎么办?

3.7.1、事务机制控制

  • txSelect():开启事务

  • txCommit():提交事务

  • txRollback():回滚事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package cn.yanghuisen.tx.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 事务-发送消息
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "tx";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

Connection connection = null;
Channel channel = null;

// 通过工厂创建连接
try{
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启事务
channel.txSelect();
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
int i = 1/0;
System.out.println(" [x] Sent '" + message + "'");
channel.txCommit();
}catch (Exception e){
e.printStackTrace();
// 回滚
channel.txRollback();
channel.close();
connection.close();


}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.tx.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 事务-接收消息
*/
public class Recv {

// 队列名称
private final static String QUEUE_NAME = "tx";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

缺点:降低了RabbitMQ的消息吞吐量

解决:使用confirm模式

总结:使用事务,可以在发送请求但是没有提交事务前回滚事务,撤回发送的消息。

3.7.2、确认模式(confirm)

生产者设置为确认模式,发送消息时所有的消息都会被指派一个唯一的ID,一旦消息被投递套指定的队列之后,就会返回一个确认结果给生产者(包含消息的唯一ID),这样生产者就知道了消息已经正确到达了目的地。如果消息和队列时可以持久化的,那么确认消息会将消息写入磁盘后发出。

confirm模式大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回 确认的同时继续发送下一条消息,当消息终得到确认之后,生产者应用便可以通过回调方法来处理该 确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序 同样可以在回调方法中处理该nack消息。

实现Confirm确认机制有三种方式

1、普通Confirm模式

每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
package cn.yanghuisen.confirm.sync.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 确认模式-同步-单条-发送消息
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "confirm_sync";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

Connection connection = null;
Channel channel = null;

// 通过工厂创建连接
try{
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

// 确认消息是否发送成功
if (channel.waitForConfirms()){
System.out.println("消息发送成功");
}else {
System.out.println("消息发送失败");
}



System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e){
e.printStackTrace();


}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.confirm.sync.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 确认模式-同步-单条-接收消息
*/
public class Recv {

// 队列名称
private final static String QUEUE_NAME = "confirm_sync";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

2、批量confirm模式:每发送一批消息后,调用waitForConfirmsOrDie()方法,等待服务器端 confirm。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
package cn.yanghuisen.confirm.sync.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.nio.charset.StandardCharsets;

/**
* 确认模式-同步-批量-发送消息
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "confirm_sync";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

Connection connection = null;
Channel channel = null;

// 通过工厂创建连接
try{
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
/*
声明队列
1、队列名称
2、是否持久化
3、排他队列,如果一个度列被声明名排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除
1. 排他队列是基于连接可见的,同一连接的不同通道是可以同时访问同一个连接创建的排他队列的。
2. "首次",如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。
3. 即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。
这种队列适用于只限于一个客户端发送读取消息的应用场景。
4、自动删除,如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列
*/
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "Hello World!";
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));

// 确认消息是否发送成功-多条
// 如果有一条没被确认,就会抛IO异常
channel.waitForConfirmsOrDie();



// if (channel.waitForConfirms()){
// System.out.println("消息发送成功");
// }else {
// System.out.println("消息发送失败");
// }



System.out.println(" [x] Sent '" + message + "'");
}catch (Exception e){
e.printStackTrace();


}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.confirm.sync.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 确认模式-同步-批量-接收消息
*/
public class Recv {

// 队列名称
private final static String QUEUE_NAME = "confirm_sync";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

3、异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
package cn.yanghuisen.confirm.async.send;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

/**
* 确认模式-异步-发送消息
*/
public class Send {

// 队列名称
private final static String QUEUE_NAME = "confirm_async";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");

Connection connection = null;
Channel channel = null;

// 通过工厂创建连接
try{
// 维护信息发送回执deliveryTag
final SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<>());
// 创建连接
connection = factory.newConnection();
// 获取通道
channel = connection.createChannel();
// 开启确认模式
channel.confirmSelect();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 添加channel 监听
channel.addConfirmListener(new ConfirmListener() {
/**
* 已确认
* @param l 唯一标识
* @param b 确认多条还是单条,true多条
* @throws IOException
*/
@Override
public void handleAck(long l, boolean b) throws IOException {
// 判断确认的是多条还是单条
if (b){
System.out.println("handleAck--success-->multiple" + l);
// 清除前 l 标识ID
confirmSet.headSet(l+1).clear();
}else {
System.out.println("handleAck--success-->single" + l);
confirmSet.remove(l);
}
}

/**
* 未确认
* @param l
* @param b
* @throws IOException
*/
@Override
public void handleNack(long l, boolean b) throws IOException {
if (b){
System.out.println("handleNack--failed-->multiple-->" + l);
// 清除前 deliveryTag 项标识id
confirmSet.headSet(l + 1L).clear();
}else {
System.out.println("handleNack--failed-->single" + l);
confirmSet.remove(l);
}
}
});


// 循环发送消息
while (true){
// 消息内容
String message = "Hello World!";
// 获取unconfirm的消息序号
Long seqNo = channel.getNextPublishSeqNo();
// 将消息放入队列并发送
channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
// 将消息序号添加到SortedSet
confirmSet.add(seqNo);
}
}catch (Exception e){
e.printStackTrace();
channel.close();
connection.close();

}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package cn.yanghuisen.confirm.async.recv;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;

/**
* 确认模式-异步-接收消息
*/
public class Recv {

// 队列名称
private final static String QUEUE_NAME = "confirm_async";

public static void main(String[] argv) throws Exception {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.10.100");
factory.setPort(5672);
factory.setUsername("shop");
factory.setPassword("shop");
factory.setVirtualHost("/shop");
// 创建连接
Connection connection = factory.newConnection();
// 获取信息
Channel channel = connection.createChannel();
// 申明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
// 接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
// 监听队列
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

4、Spring集成RabbitMQ

官网:https://spring.io/projects/spring-amqp

4.1、创建聚合项目

在这里插入图片描述

父pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<packaging>pom</packaging>
<modules>
<module>provider</module>
<module>consumer</module>
</modules>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.0.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>cn.yanghuisen</groupId>
<artifactId>spring-rabbitmq</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>spring-rabbitmq</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

4.2、生产者

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-rabbitmq</artifactId>
<groupId>cn.yanghuisen</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>provider</artifactId>

<name>provider</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

</dependencies>

</project>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
package cn.yanghuisen;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Hello world!
*
*/
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package cn.yanghuisen;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


/**
* @author admin
* @version 1.0
* @date 2020/5/29 22:28
* @Description TODO
*/
@Configuration
public class RabbitMQConfig {

/**
* 申明队列
* @return
*/
@Bean
public Queue queue(){
return new Queue("topic");
}


/**
* 申明交换机
* @return
*/
@Bean
public TopicExchange topicExchange(){
return new TopicExchange("topicExchange");
}

/**
* 将队列绑定到交换机上
* @return
*/
@Bean
public Binding binding(){
return BindingBuilder.bind(queue()).to(topicExchange()).with("*.msg.#");
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package cn.yanghuisen;

import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
* @author admin
* @version 1.0
* @date 2020/5/29 22:33
* @Description TODO
*/
@Component
public class Send {
@Resource
private RabbitTemplate rabbitTemplate;

public void send(){
String message = "Hello World";
/*
1、交换机参数
2、路由key
3、消息内容
*/
rabbitTemplate.convertAndSend("topicExchange","topic.msg",message);
System.out.println("发送消息:"+message);
}
}

4.3、消费者

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-rabbitmq</artifactId>
<groupId>cn.yanghuisen</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>consumer</artifactId>

<name>consumer</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>

</dependencies>

<build>

</build>
</project>

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
package cn.yanghuisen;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* Hello world!
*
*/
@SpringBootApplication
public class App
{
public static void main( String[] args )
{
SpringApplication.run(App.class);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
package cn.yanghuisen;

import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* @author admin
* @version 1.0
* @date 2020/5/29 22:36
* @Description TODO
*/
@Component
@RabbitListener(queues = "topic") // 监听队列
public class Consumer {


// 接收消息的处理方法
@RabbitHandler
public void recv(String message){
System.out.println("接受消息:"+message);
}
}

4.4、测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
package cn.yanghuisen;

import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

/**
* @author admin
* @version 1.0
* @date 2020/5/29 22:54
* @Description TODO
*/
@SpringBootTest
public class TestSend {

@Resource
private Send send;

@Test
public void testSend(){
send.send();
}

}