RabbitMQ

RabbitMQ

同步通信和异步通信

同步通信:实时性强,一个时刻只能对应一个人,类似于打电话

异步通信:同一时刻,可以实现一对多,类似于微信聊天

同步调用:

  • 可以立刻得到响应的结果,时效性强。

  • 拓展性差。性能下降。级联失败。

异步调用:

异步调用方式其实就是基于消息通知的方式,一般包含三个角色

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息代理:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

优势:

  • 解除耦合,拓展性强。
  • 无需等待,性能好。
  • 故障隔离。
  • 消息缓存,流量削峰填谷。

劣势:

  • 无法立刻得到调用的结果,时效性差。
  • 不确定下游业务是否执行成功。
  • 业务安全依赖于broken的可靠性。

RabbitMQ的安装启动

docker上安装

1、下载RabbitMQ的镜像

1
docker pull rabbitmq

2、启动

1
2
3
4
5
6
7
8
9
10
sudo docker run \
-e RABBITMQ_DEFAULT_USER=kyclnx \
-e RABBITMQ_DEFAULT_PASS=123 \
-v rabbitmq-plugins:/plugins \
--name rabbitmq \
--hostname rabbitmq \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest

指令说明:

1
2
3
4
5
6
7
8
9
10
sudo docker run: 基本的Docker命令,用于启动一个新的容器实例
-e RABBITMQ_DEFAULT_USER=wuyanzu: 设置RabbitMQ服务的默认用户名为wuyanzu
-e RABBITMQ_DEFAULT_PASS=kZoeSW$$xS5i^Cum: 设置RabbitMQ服务的默认密码为bhoLdSvpd0UAOysh
-v rabbitmq-plugins:/plugins: 将一个名为rabbitmq-plugins的卷映射到容器的/plugins目录,用于存放RabbitMQ的插件。这里的rabbitmq-plugins是一个卷的名称,而不是宿主机的路径
--name rabbitmq: 指定容器的名称为rabbitmq
--hostname rabbitmq: 设置容器的主机名为rabbitmq
-p 15672:15672: 将宿主机的端口15672映射到容器的端口15672,这是RabbitMQ管理界面的默认端口
-p 5672:5672: 将宿主机的端口5672映射到容器的端口5672,这是RabbitMQ用于AMQP协议通信的默认端口
-d: 在后台运行容器(守护进程)
rabbitmq:latest: 使用最新的RabbitMQ官方镜像来创建容器

尝试访问:http://宿主机ip:15672/

Ubuntu

1
2
3
sudo ufw allow 15672
sudo ufw allow 5672
sudo ufw reload

Centos

1
2
3
sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent
sudo firewall-cmd --reload

RabbitMQ没有安装web插件

1
2
3
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_management
退出

最后通过

RabbitMQ整体概念

  • publisher:消息发送者
  • consumer:消息的消费者
  • queue:队列,存储消息
  • exange:交换机,负责路由消息
  • virtual-host,虚拟主机,起到了数据隔离的作用。

快速入门

需求:在RabbitMQ的控制台完成下列操作:

  • 新建队列hello.queue1和hello.queue2
  • 向默认的amp.fanout交换机发送一条消息
  • 查看消息是否到达hello.queue1和hello.queue2

总结:

1、如果交换机和队列没有绑定,我们直接向着交换机发送消息,不能发送成功。

交换机不能存储消息,只能路由和转发消息。

2、一旦绑定两个队列在交换机上面,交换机发送消息,两个队列均可收到消息(广播)

数据隔离

需求:在RabbitMQ的控制台完成下列操作

  • 新建一个用户hmall。
  • 为hmall用户创建一个virtual host。
  • 测试不同virtualhost之间的数据隔离现象。

java客户端快速入门

发送消息

test中的包的包名要和模块下的包名一致,队列名字也要一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
package com.itheima.publisher;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
void testSendMessage2Queue(){
String queueName = "hello.queue1";
String msg = "hello, qmqp";
rabbitTemplate.convertAndSend(queueName, msg);
}
}

打开localhost:15672的queue队列可以看得到发送的消息

接收消息

1
2
3
4
5
6
7
8
9
10
11
12
package com.itheima.consumer.listeners;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = "hello.queue1")
public void listenSimpleQueue(String msg){
System.out.println("消费者收到simple,queue的消息【"+msg+"】");
}
}

kyclnx/mq (gitee.com)

java客户端-Work模型

Workqueues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息。

基本思路如下:
1.在RabbitMQ的控制台创建一个队列,名为工作队列
2.在Publisher服务中定义测试方法,在1秒内产生50条消息,发送到Work.Queue
3.在Consumer服务中定义两个消息监听者,都监听Work.Queue队列
4.消费者1每秒处理50条消息,消费者2每秒处理5条消息。

消费者消息推送限制
默认情况下,RabbitMO的会将消息依次轮询投递给绑定在队列上的每一个消费者。但这并没有考虑到消费者是否已经处理完消息,可能出现消息堆积。
因此我们需要修改application.yml,设置preFetch值为1,确保同一时刻最多投递给消费者1条消息。

work模型的使用:

  • 多个消费者绑定到一个队列可以加快消息的处理速率。
  • 同一个消息只会被一个消费者来处理
  • 通过prefetch来控制消费者预处理消息的数量,处理完一条再处理下一条,实现能者多劳

https://gitee.com/kyclnx/rabbitmq/tree/test1/

交换机

Fanout交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种:

  • Fanout:广播
  • Direct:定向
  • Topic:话题

利用SpringAMQP演示FanoutExchange的使用
实现思路如下:
1.在RabbitMO控制台中,声明队列fanout.queue1和fanout.queue2
2.在RabbitMQ控制台中,声明交换机hmall.fanout,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
4.在publisher中编写测试方法,向hmall.fanout发送消息

https://gitee.com/kyclnx/rabbitmq/tree/test2/

direct交换机

Direct Exchange 会将接收到的消息根据规则路由到指定的Queue,因此称为定向路由。

  • 每一个Queue都与Exchange设置一个BindingKey
  • 发布者发送消息时,指定消息的RoutingKey
  • Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
  • 在publisher中编写测试方法,利用不同的RoutingKey向hmall.direct发送消息

Topic 交换机

TopicExchange与DirectExchange类似,区别在于routingKey可以是多个单词的列表,并且以.分割。

Queue与Exchange指定BindingKey时可以使用通配符

  • #:代指0个或多个单词
  • *:代指一个单词

需求如下:
1.在RabbitMQ控制台中,声明队列topic.queue1和topic.queue2
2.在RabbitMQ控制台中,声明交换机hmall.topic,将两个队列与其绑定
3.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
4.在publisher中编写测试方法,利用不同的RoutingKey向hmall.topic发送消息

队列

java客户端声明队列

声明队列和交换机

  • SpringAMQP提供了几个类,用来声明队列、交换机及其绑定关系:
  • Queue:用于声明队列,可以用工厂类QueueBuilder构建
  • Exchange:用于声明交换机,可以用工厂类ExchangeBuilder构建
  • Binding:用于声明队列和交换机的绑定关系,可以用工厂类BindingBuilder构建

RabbitMQ
http://example.com/2024/09/08/RabbitMq/
作者
nianjx
发布于
2024年9月8日
许可协议