RabbitMQ

RabbitMQ

1. RabbitMQ引言

官方网站:https://www.rabbitmq.com/

什么是MQ

MQ(Message Queue):消息队列。通过典型的生产者和消费者模型,生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的,而且只关心消息的发送和接受。没有业务逻辑的侵入,轻松的实现系统间解耦。

别名:消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

image-20210529134454943

上图就是消息队列最原始的模型,其中两个关键词:消息、队列

  1. 消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式
  2. 队列:存放消息的容器。入队即发消息的过程,出队即收消息的过程。

RabbitMQ

基于AMQP协议,erlang语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。

RabbitMQ的主要特征是面向消息、队列、路由(包含点对点和发布/订阅)、可靠性、安全。

2.RabbitMQ安装

Docker安装

在Docker hub中找到docker 镜像。

1

1
docker run -d -p 15672:15672 -p 5672:5672 -v RabbitMQConfig:/etc/rabbitmq -v RabbitMQData:/var/lib/rabbitmq --name docker c9b2833379d6

输入:192.168.111.128(Linux系统IP地址):15672。

image-20210529145058481

用户名密码都是guest

image-20210529145225750

3、AMQP协议

image-20210529152358392

主要包含了三个主要的组件:

  • exchange(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中

  • Queue(消息队列):存储消息。直到消息被安全的投递给了消费者。

  • binding :定义了 Queueexchange 之间的关系,提供了消息路由的规则。

可以把AMQP的架构理解为一个邮件服务:

  • 一个消息类似于一封邮件信息
  • 消息队列类似于一个邮箱(Mailbox)
  • 消费者类似一个邮件客户端,能够拉取和删除邮件。
  • 交换器类似一个MTA(邮件服务器)。检查邮件,基于邮件里的路由信息、路由表,来决定如何把邮件发送到一个或多个邮箱里。
  • Routing Key类似于邮件中的To:Cc:Bcc: 的地址。不包含服务端信息。
  • 每一个交换器实例,类似于各个MTA进程。用于处理不同子域名的邮件,或者特定类型的邮件。
  • Binding 类似于MTA中的路由表。

在AMQP里,生产者直接把消息发到服务端,服务端通过这些消息路由发送到邮箱中。消费者直接从邮箱里取消息。

在RabbitMQ中,有多种模型,有的模型直接发送给队列,有的模型需要通过交换机。

image-20210529235632153

4、直连模型

我们可以看到,直连是最简单的一种模型,生产者直接将消息放入队列中,消费者直接从队列读取消息。并且生产者、队列和消费者都是一一对应的。

image-20210530083904074

1、创建Virtual Host

在实现这种模型之前。我们需要通过RabbitMQ的web管理界面新建一些东西。

下图看出,队列都在Virtual Host中,我们需要去创建它。

image-20210529152358392

  1. 打开Virtual Host界面

image-20210530084822053

  1. 添加Virtual Host

image-20210530085034109

image-20210530085106920

  1. 添加User

image-20210530085242819

  1. 允许User访问我们的Virtual Host

image-20210530085426373

image-20210530085440943

2、构建项目并引入依赖

image-20210530084143676

1
2
3
4
5
6
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.2</version>
</dependency>

3、生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public class Provider {
//发送消息
public static void main(String[] args) throws IOException, TimeoutException {

//创建连接mq的连接工厂对象
ConnectionFactory factory = new ConnectionFactory();
//设置连接rabbitMQ的主机
factory.setHost("192.168.111.128");
//设置端口号
factory.setPort(5672);
//设置连接哪个虚拟主机
factory.setVirtualHost("/test1");
//设置访问虚拟主机的用户名和密码
factory.setUsername("jiang");
factory.setPassword("123");

//获取连接对象
Connection connection = factory.newConnection();

//获取连接中的通道
Channel channel = connection.createChannel();

//通道绑定对应的消息队列
/***
* 参数1:队列名称 如果队列不存在会自动创建
* 参数2:是否持久化
* 参数3:是否独占队列
* 参数4:是否在消费完成后自动删除队列
* 参数5:额外附加参数
*/
channel.queueDeclare("queue1",false,false,false,null);

/**
* 参数1:交换机 这里不需要
* 参数2:队列名称
* 参数3:传递消息额外设置
* 参数4:消息的具体内容
*/
channel.basicPublish("","queue1",null,"Hello RabbitMQ".getBytes());

channel.close();
connection.close();
}
}

查看web管理页面,可以看到自动创建了一个队列。并且队列中有一条消息。

image-20210530093418943

4、消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Customer {

public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.111.128");
factory.setPort(5672);
factory.setVirtualHost("/test1");
factory.setUsername("jiang");
factory.setPassword("123");

//创建连接对象
Connection connection = factory.newConnection();

//创建通道
Channel channel = connection.createChannel();


//通道绑定对象 参数必须与生产者的的参数一致
channel.queueDeclare("queue1",false,false,false,null);

/**
* 消费消息
* 参数1:队列名称
* 参数2:开启消息的自动确认机制
* 参数3:消费时的回调接口
*/
channel.basicConsume("queue1",true,new DefaultConsumer(channel){

@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("==================");
System.out.println(new String(body));
}
});
}
}

image-20210530102411929

此时web管理页面中队列中没有数据

image-20210530102448600

5、连接和关闭的工具类

有大量的冗余代码。因此新建一个连接和关闭的工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class ConnectionUtils {

//返回连接对象
public static Connection Connect(){
try {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.111.128");
factory.setPort(5672);
factory.setVirtualHost("/test1");
factory.setUsername("jiang");
factory.setPassword("123");

//创建连接对象
return factory.newConnection();
}catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

return null;
}

//关闭连接
public static void closeConn(Channel channel, Connection connection){
try{
if(channel != null){
channel.close();
}

if (connection != null){
connection.close();
}

} catch (TimeoutException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}

}
}

6、API的细节

1
2
3
//通道绑定队列
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
Map<String, Object> arguments) throws IOException;
  • queue:队列名称

  • durable:是否将队列持久化,如果是true ,重启RabbitMQ时会重新创建队列,但不会将队列中的消息持久化

  • exclusive:是否独占队列

  • autoDelete:是否自动删除队列。当队列中没有消息,并且没有消费者正在监听时会自动删除队列。

5、任务模型

work queue也称为task queues,任务模型。当消息处理比较耗时,可能生产者生产消息的速度远远大于消息的消费速度。因此,消息就会堆积越来越多。无法及时处理。

此时就可以使用任务模型:让多个消费者绑定一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

image-20210530105541874

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Customer1 {
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.Connect();
//创建通道
Channel channel = connection.createChannel();
//通道绑定对象 参数必须与生产者的的参数一致
channel.queueDeclare("queue1",false,false,false,null);
channel.basicConsume("queue1",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1" + new String(body));
}
});
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class Customer2 {
public static void main(String[] args) throws TimeoutException, IOException {
Connection connection = ConnectionUtils.Connect();
//创建通道
Channel channel = connection.createChannel();
//通道绑定对象 参数必须与生产者的的参数一致
channel.queueDeclare("queue1",false,false,false,null);
channel.basicConsume("queue1",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2" + new String(body));
}
});
}
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Provider {
//发送消息
public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtils.Connect();

//获取连接中的通道
Channel channel = connection.createChannel();
channel.queueDeclare("queue1",false,false,false,null);

//生产二十条消息
for (int i = 0; i < 20; i++) {
channel.basicPublish("","queue1",null,("Hello RabbitMQ" + i).getBytes());
}


ConnectionUtils.closeConn(channel,connection);
}
}

结果:我们先启动两个消费者,然后再启动生产者生产消息。

image-20210530123134328

image-20210530123339957

总结:在默认的情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。每个消费者都会受到相同的消息。称为轮询。

自动确认

image-20210530124724347

RabbitMQ默认是使用轮询的方式来发送消息给消费者。因此当生产者生产一条消息之后,就直接发送给对应的消费者,不关心消费者的情况。

但此时的消费者可能出现了问题。比如:被阻塞、异常,或者处理时间过长等。

此时消费者1被阻塞

image-20210530130031461

消费者2已经消费完

image-20210530130107821

对于自动确认来说:当方法没有异常执行完毕后,会对MQ发出ACK;若方法出现异常,会对MQ发出nack,消息会重回队列。要分清哪些是可以恢复的异常,哪些是不可以恢复的异常。不可恢复的异常,在消费者代码中捕获异常,并记录日志表或放入死信队列。可恢复的异常,那么放入业务队列中重试。

手动确认

我们将autoack置为false,并且每一次只能消费一个消息。

同时消费完之后手动确认。

下面的代码中,消费者1消费的更快,消费者2消费慢。那么消费者1会消费更多的消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.Connect();
//创建通道
final Channel channel = connection.createChannel();
//通道绑定对象 参数必须与生产者的的参数一致
channel.queueDeclare("queue1", false, false, false, null);
channel.basicQos(1);
channel.basicConsume("queue1", false, new DefaultConsumer(channel) {
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws TimeoutException, IOException {
Connection connection = ConnectionUtils.Connect();
//创建通道
final Channel channel = connection.createChannel();
//通道绑定对象 参数必须与生产者的的参数一致
channel.queueDeclare("queue1",false,false,false,null);
channel.basicQos(1);
channel.basicConsume("queue1",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("消费者2 " + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}

image-20210530132954363

image-20210530132943602

6、广播

fanout也成为广播。

image-20210530133348191

在广播模型下:消息发送流程是这样的。

  • 可以有多个消费者
  • 每个消费者都有自己的队列
  • 每个队列都要绑定到交换机
  • 生产者发送的消息,只能发送到交换机,交换机来决定要发到哪个队列,生产者无法决定
  • 交换机把消息发送给绑定的所有队列
  • 队列的消费者都能拿到消息。实现一条消息被多个消费者消费。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws IOException {
//获取连接对象
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();

//将通道声明指定交换机
//广播模型
channel.exchangeDeclare("exchange1","fanout");

/***
* 参数1:exchange 交换机
* 参数2:routingKey 路由关键字 在下面两种模型使用
*/
//发送消息
channel.basicPublish("exchange1","",null,"fanout type message".getBytes());
ConnectionUtils.closeConn(channel,connect);
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();
//通道绑定交换机
channel.exchangeDeclare("exchange1","fanout");

//临时队列
String queue = channel.queueDeclare().getQueue();

//绑定交换机和队列
channel.queueBind(queue,"exchange1","");

//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 " + new String(body));
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws IOException {
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();
//通道绑定交换机
channel.exchangeDeclare("exchange1","fanout");
//临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"exchange1","");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 " + new String(body));
}
});
}

image-20210530195028841

image-20210530195005079

7、Direct

在Fanout模型下,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这就用到了Direct类型的Exchange。

image-20210530200025805

在Direct模型下:

  • 队列与交换机的绑定,不能是任务绑定了,而是指定一个RoutingKey

  • 消息的发送放在向Exchange发送消息时,也必须指定消息的RoutingKey

  • Exchange不再把消息交给每一个队列,而是根据消息的RoutingKey进行判断,只有队列的RoutingKey与消息的Routingkey一致时,才会接受消息。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) throws IOException {
//获取连接对象
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();

//将通道声明指定交换机
//广播模型
channel.exchangeDeclare("exchange1","direct");

//发送消息
/***
* 参数1:交换机
* 参数2:路由关键字
*/
channel.basicPublish("exchange1","Customer1",null,"direct type message 1".getBytes());
channel.basicPublish("exchange1","Customer2",null,"direct type message 2".getBytes());
ConnectionUtils.closeConn(channel,connect);
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();
//通道绑定交换机
channel.exchangeDeclare("exchange1","direct");

//临时队列
String queue = channel.queueDeclare().getQueue();

//绑定交换机和队列
channel.queueBind(queue,"exchange1","Customer1");

//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 " + new String(body));
}
});
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws IOException {
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();
//通道绑定交换机
channel.exchangeDeclare("exchange1","direct");
//临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"exchange1","Customer2");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 " + new String(body));
}
});
}

image-20210530201314100

image-20210530201307827

8、Topic

Topic类型的ExchangeDirect相比,都可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定在RoutingKey的时候使用通配符

这种模型RoutingKey一般都是由一个或多个单词组成。多个单词之间以.分割,例如:item.insert

image-20210530202438147

通配符

1
2
3
4
5
6
7
* 匹配不多不少恰好1个词
# 匹配一个或多个词

例如:
audit.# 只能audit.irs.corporate 或者 audit.irs等
audit.* 只能匹配 audit.irs

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws IOException {
//获取连接对象
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();

//将通道声明指定交换机
//广播模型
channel.exchangeDeclare("topics","topic");

//发送消息
/***
* 参数1:交换机
* 参数2:路由关键字
*/
channel.basicPublish("topics","user.save.test",null,"topic type message".getBytes());
ConnectionUtils.closeConn(channel,connect);
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public static void main(String[] args) throws IOException {
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();
//通道绑定交换机
channel.exchangeDeclare("topics","topic");

//临时队列
String queue = channel.queueDeclare().getQueue();

//绑定交换机和队列
channel.queueBind(queue,"topics","user.*");

//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1 " + new String(body));
}
});
}

image-20210530203809948

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public static void main(String[] args) throws IOException {
Connection connect = ConnectionUtils.Connect();
Channel channel = connect.createChannel();
//通道绑定交换机
channel.exchangeDeclare("topics","topic");
//临时队列
String queue = channel.queueDeclare().getQueue();
//绑定交换机和队列
channel.queueBind(queue,"topics","user.#");
//消费消息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2 " + new String(body));
}
});
}

9、SpringBoot整合RabbitMQ

1、 搭建初始环境

引入依赖:

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
spring.application.name=rabbitMQ-springboot

# 配置RabbitMQ
spring.rabbitmq.host=192.168.111.128
spring.rabbitmq.port=5672
spring.rabbitmq.username=jiang
spring.rabbitmq.password=123
spring.rabbitmq.virtual-host=/test1

2、直连模型

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest(classes = RabbitmqdemoApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//参数1:队列,有消费者创建
//参数2:消息
rabbitTemplate.convertAndSend("hello","Hello World");
}

}

消费者

1
2
3
4
5
6
7
8
9
10
@Component
//创建队列,并设置队列参数
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class hello {
//回调函数
@RabbitHandler
public void receive(String message){
System.out.println("message : " + message);
}
}

2、 任务模型

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@SpringBootTest(classes = RabbitmqdemoApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void workTest(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","work" + i);
}
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
public class work {

@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("消费者1 " + message);
}

@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("消费者2 " + message);
}
}

image-20210531122059284

3、广播模型

生产者

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest(classes = RabbitmqdemoApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testFanout(){
rabbitTemplate.convertAndSend("logs","","fanout模型数据");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Component
public class Fanout {


@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout")
)
})
public void receive1(String message){
System.out.println("message1 " + message);
}


@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "logs",type = "fanout")
)
})
public void receive2(String message){
System.out.println("message2 " + message);
}
}

image-20210531123205281

4、路由模型

生产者

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest(classes = RabbitmqdemoApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testRoute(){
rabbitTemplate.convertAndSend("directs","info","发送info路由信息");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
public class Route {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs",type = "direct"),
key = {"info","error"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(value = "directs",type = "direct"),
key = {"error"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}

image-20210531124440561

5、动态路由

生产者

1
2
3
4
5
6
7
8
9
10
11
12
@SpringBootTest(classes = RabbitmqdemoApplication.class)
@RunWith(SpringRunner.class)
public class TestRabbitMQ {
//注入rabbitTemplate
@Autowired
private RabbitTemplate rabbitTemplate;

@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save.test","user.save.test路由消息");
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Component
public class Topic {


@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = "topic"),
key = {"user.*"}
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}

@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
exchange = @Exchange(name = "topics",type = "topic"),
key = {"user.#"}
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}

image-20210531125047616


RabbitMQ
https://johnjoyjzw.github.io/2021/03/01/RabbitMQ/
Author
John Joy
Posted on
March 1, 2021
Licensed under