RabbitMQ
1. RabbitMQ引言
官方网站:https://www.rabbitmq.com/
什么是MQ
MQ(Message Queue):消息队列。通过典型的生产者和消费者模型
,生产者不断的向消息队列中生产消息,消费者不断的从队列中获取消息。因为消息的生产和消费都是异步的
,而且只关心消息的发送和接受。没有业务逻辑的侵入,轻松的实现系统间解耦。
别名:消息中间件,通过利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。

上图就是消息队列最原始的模型,其中两个关键词:消息、队列
- 消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式
- 队列:存放消息的容器。入队即发消息的过程,出队即收消息的过程。
RabbitMQ
基于AMQP
协议,erlang
语言开发,是部署最广泛的开源消息中间件,是最受欢迎的开源消息中间件之一。
RabbitMQ的主要特征是面向消息、队列、路由(包含点对点和发布/订阅)、可靠性、安全。
2.RabbitMQ安装
Docker安装
在Docker hub中找到docker 镜像。
1
| docker run -d -p 15672:15672 -p 5672:5672 -v RabbitMQConfig:/etc/rabbitmq -v RabbitMQData:/var/lib/rabbitmq --name docker c9b2833379d6
|
输入:192.168.111.128(Linux系统IP地址):15672。

用户名密码都是guest

3、AMQP协议

主要包含了三个主要的组件:
exchange
(交换器):从Publisher程序中收取消息,并把这些消息根据一些规则路由到消息队列(Message Queue)中
Queue
(消息队列):存储消息。直到消息被安全的投递给了消费者。
binding
:定义了 Queue
和 exchange
之间的关系,提供了消息路由的规则。
可以把AMQP的架构理解为一个邮件服务:
- 一个消息类似于一封邮件信息
- 消息队列类似于一个邮箱(Mailbox)
- 消费者类似一个邮件客户端,能够拉取和删除邮件。
- 交换器类似一个MTA(邮件服务器)。检查邮件,基于邮件里的路由信息、路由表,来决定如何把邮件发送到一个或多个邮箱里。
- Routing Key类似于邮件中的
To:
,Cc:
, Bcc:
的地址。不包含服务端信息。
- 每一个交换器实例,类似于各个MTA进程。用于处理不同子域名的邮件,或者特定类型的邮件。
Binding
类似于MTA中的路由表。
在AMQP里,生产者直接把消息发到服务端,服务端通过这些消息路由发送到邮箱中。消费者直接从邮箱里取消息。
在RabbitMQ中,有多种模型,有的模型直接发送给队列,有的模型需要通过交换机。

4、直连模型
我们可以看到,直连是最简单的一种模型,生产者直接将消息放入队列中,消费者直接从队列读取消息。并且生产者、队列和消费者都是一一对应的。

1、创建Virtual Host
在实现这种模型之前。我们需要通过RabbitMQ的web管理界面新建一些东西。
下图看出,队列都在Virtual Host中,我们需要去创建它。

- 打开Virtual Host界面

- 添加Virtual Host


- 添加User

- 允许User访问我们的Virtual Host


2、构建项目并引入依赖

1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.2</version> </dependency>
|
3、生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
| public class Provider { public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.111.128"); factory.setPort(5672); factory.setVirtualHost("/test1"); factory.setUsername("jiang"); factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue1",false,false,false,null);
channel.basicPublish("","queue1",null,"Hello RabbitMQ".getBytes());
channel.close(); connection.close(); } }
|
查看web管理页面,可以看到自动创建了一个队列。并且队列中有一条消息。

4、消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class Customer {
public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.111.128"); factory.setPort(5672); factory.setVirtualHost("/test1"); factory.setUsername("jiang"); factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue1",false,false,false,null);
channel.basicConsume("queue1",true,new DefaultConsumer(channel){
@Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("=================="); System.out.println(new String(body)); } }); } }
|

此时web管理页面中队列中没有数据

5、连接和关闭的工具类
有大量的冗余代码。因此新建一个连接和关闭的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class ConnectionUtils {
public static Connection Connect(){ try { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.111.128"); factory.setPort(5672); factory.setVirtualHost("/test1"); factory.setUsername("jiang"); factory.setPassword("123");
return factory.newConnection(); }catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
return null; }
public static void closeConn(Channel channel, Connection connection){ try{ if(channel != null){ channel.close(); }
if (connection != null){ connection.close(); }
} catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); }
} }
|
6、API的细节
1 2 3
| Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException;
|
5、任务模型
work queue
也称为task queues
,任务模型。当消息处理比较耗时,可能生产者生产消息的速度远远大于消息的消费速度。因此,消息就会堆积越来越多。无法及时处理。
此时就可以使用任务模型:让多个消费者绑定一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class Customer1 { public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.Connect(); Channel channel = connection.createChannel(); channel.queueDeclare("queue1",false,false,false,null); channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1" + new String(body)); } }); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class Customer2 { public static void main(String[] args) throws TimeoutException, IOException { Connection connection = ConnectionUtils.Connect(); Channel channel = connection.createChannel(); channel.queueDeclare("queue1",false,false,false,null); channel.basicConsume("queue1",true,new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2" + new String(body)); } }); } }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public class Provider { public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtils.Connect();
Channel channel = connection.createChannel(); channel.queueDeclare("queue1",false,false,false,null);
for (int i = 0; i < 20; i++) { channel.basicPublish("","queue1",null,("Hello RabbitMQ" + i).getBytes()); }
ConnectionUtils.closeConn(channel,connection); } }
|
结果:我们先启动两个消费者,然后再启动生产者生产消息。


总结:在默认的情况下,RabbitMQ将按顺序将每条消息发送给下一个使用者。每个消费者都会受到相同的消息。称为轮询。
自动确认

RabbitMQ默认是使用轮询的方式来发送消息给消费者。因此当生产者生产一条消息之后,就直接发送给对应的消费者,不关心消费者的情况。
但此时的消费者可能出现了问题。比如:被阻塞、异常,或者处理时间过长等。
此时消费者1被阻塞

消费者2已经消费完

对于自动确认来说:当方法没有异常执行完毕后,会对MQ发出ACK;若方法出现异常,会对MQ发出nack,消息会重回队列。要分清哪些是可以恢复的异常,哪些是不可以恢复的异常。不可恢复的异常,在消费者代码中捕获异常,并记录日志表或放入死信队列。可恢复的异常,那么放入业务队列中重试。
手动确认
我们将autoack置为false,并且每一次只能消费一个消息。
同时消费完之后手动确认。
下面的代码中,消费者1消费的更快,消费者2消费慢。那么消费者1会消费更多的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtils.Connect(); final Channel channel = connection.createChannel(); channel.queueDeclare("queue1", false, false, false, null); channel.basicQos(1); channel.basicConsume("queue1", false, new DefaultConsumer(channel) { @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) throws TimeoutException, IOException { Connection connection = ConnectionUtils.Connect(); final Channel channel = connection.createChannel(); channel.queueDeclare("queue1",false,false,false,null); channel.basicQos(1); channel.basicConsume("queue1",false,new DefaultConsumer(channel){ @Override public void handleDelivery(java.lang.String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消费者2 " + new String(body)); channel.basicAck(envelope.getDeliveryTag(),false); } }); }
|


6、广播
fanout
也成为广播。

在广播模型下:消息发送流程是这样的。
- 可以有多个消费者
- 每个消费者都有自己的队列
- 每个队列都要绑定到交换机
- 生产者发送的消息,只能发送到交换机,交换机来决定要发到哪个队列,生产者无法决定
- 交换机把消息发送给绑定的所有队列
- 队列的消费者都能拿到消息。实现一条消息被多个消费者消费。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel();
channel.exchangeDeclare("exchange1","fanout");
channel.basicPublish("exchange1","",null,"fanout type message".getBytes()); ConnectionUtils.closeConn(channel,connect); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel(); channel.exchangeDeclare("exchange1","fanout");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"exchange1","");
channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 " + new String(body)); } }); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel(); channel.exchangeDeclare("exchange1","fanout"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"exchange1",""); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2 " + new String(body)); } }); }
|


7、Direct
在Fanout模型下,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这就用到了Direct类型的Exchange。

在Direct模型下:
队列与交换机的绑定,不能是任务绑定了,而是指定一个RoutingKey
。
消息的发送放在向Exchange
发送消息时,也必须指定消息的RoutingKey
。
Exchange
不再把消息交给每一个队列,而是根据消息的RoutingKey
进行判断,只有队列的RoutingKey
与消息的Routingkey
一致时,才会接受消息。
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel();
channel.exchangeDeclare("exchange1","direct");
channel.basicPublish("exchange1","Customer1",null,"direct type message 1".getBytes()); channel.basicPublish("exchange1","Customer2",null,"direct type message 2".getBytes()); ConnectionUtils.closeConn(channel,connect); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel(); channel.exchangeDeclare("exchange1","direct");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"exchange1","Customer1");
channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 " + new String(body)); } }); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel(); channel.exchangeDeclare("exchange1","direct"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"exchange1","Customer2"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2 " + new String(body)); } }); }
|


8、Topic
Topic
类型的Exchange
与Direct
相比,都可以根据RoutingKey
把消息路由到不同的队列。只不过Topic
类型Exchange
可以让队列在绑定在RoutingKey
的时候使用通配符。
这种模型RoutingKey
一般都是由一个或多个单词组成。多个单词之间以.
分割,例如:item.insert
。

通配符
1 2 3 4 5 6 7
| * 匹配不多不少恰好1个词 # 匹配一个或多个词
例如: audit.# 只能audit.irs.corporate 或者 audit.irs等 audit.* 只能匹配 audit.irs
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel();
channel.exchangeDeclare("topics","topic");
channel.basicPublish("topics","user.save.test",null,"topic type message".getBytes()); ConnectionUtils.closeConn(channel,connect); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel(); channel.exchangeDeclare("topics","topic");
String queue = channel.queueDeclare().getQueue();
channel.queueBind(queue,"topics","user.*");
channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1 " + new String(body)); } }); }
|

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| public static void main(String[] args) throws IOException { Connection connect = ConnectionUtils.Connect(); Channel channel = connect.createChannel(); channel.exchangeDeclare("topics","topic"); String queue = channel.queueDeclare().getQueue(); channel.queueBind(queue,"topics","user.#"); channel.basicConsume(queue,true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2 " + new String(body)); } }); }
|

9、SpringBoot整合RabbitMQ
1、 搭建初始环境
引入依赖:
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
配置文件
1 2 3 4 5 6 7 8
| spring.application.name=rabbitMQ-springboot
spring.rabbitmq.host=192.168.111.128 spring.rabbitmq.port=5672 spring.rabbitmq.username=jiang spring.rabbitmq.password=123 spring.rabbitmq.virtual-host=/test1
|
2、直连模型
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @SpringBootTest(classes = RabbitmqdemoApplication.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { @Autowired private RabbitTemplate rabbitTemplate; @Test public void test(){ rabbitTemplate.convertAndSend("hello","Hello World"); }
}
|
消费者
1 2 3 4 5 6 7 8 9 10
| @Component
@RabbitListener(queuesToDeclare = @Queue("hello")) public class hello { @RabbitHandler public void receive(String message){ System.out.println("message : " + message); } }
|
2、 任务模型
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @SpringBootTest(classes = RabbitmqdemoApplication.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { @Autowired private RabbitTemplate rabbitTemplate; @Test public void workTest(){ for (int i = 0; i < 10; i++) { rabbitTemplate.convertAndSend("work","work" + i); } }
}
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Component public class work {
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive1(String message){ System.out.println("消费者1 " + message); }
@RabbitListener(queuesToDeclare = @Queue("work")) public void receive2(String message){ System.out.println("消费者2 " + message); } }
|

3、广播模型
生产者
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest(classes = RabbitmqdemoApplication.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testFanout(){ rabbitTemplate.convertAndSend("logs","","fanout模型数据"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Component public class Fanout {
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs",type = "fanout") ) }) public void receive1(String message){ System.out.println("message1 " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "logs",type = "fanout") ) }) public void receive2(String message){ System.out.println("message2 " + message); } }
|

4、路由模型
生产者
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest(classes = RabbitmqdemoApplication.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testRoute(){ rabbitTemplate.convertAndSend("directs","info","发送info路由信息"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component public class Route { @RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "directs",type = "direct"), key = {"info","error"} ) }) public void receive1(String message){ System.out.println("message1 = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(value = "directs",type = "direct"), key = {"error"} ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
|

5、动态路由
生产者
1 2 3 4 5 6 7 8 9 10 11 12
| @SpringBootTest(classes = RabbitmqdemoApplication.class) @RunWith(SpringRunner.class) public class TestRabbitMQ { @Autowired private RabbitTemplate rabbitTemplate;
@Test public void testTopic(){ rabbitTemplate.convertAndSend("topics","user.save.test","user.save.test路由消息"); } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Component public class Topic {
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics",type = "topic"), key = {"user.*"} ) }) public void receive1(String message){ System.out.println("message1 = " + message); }
@RabbitListener(bindings = { @QueueBinding( value = @Queue, exchange = @Exchange(name = "topics",type = "topic"), key = {"user.#"} ) }) public void receive2(String message){ System.out.println("message2 = " + message); } }
|
