简介
AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
作用
- 异步处理
- 应用解耦
- 流量削峰
- 日志处理
- 等等等等等。。。。。。
安装与配置
使用Docker安装
拉取镜像
1 2
| #指定版本,该版本包含了web控制页面 docker pull rabbitmq:management
|
运行
1 2 3 4 5
| #方式一:默认guest 用户,密码也是 guest docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management
#方式二:设置用户名和密码 docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=1234 -p 15672:15672 -p 5672:5672 rabbitmq:management
|
启动成功后浏览器访问http://192.168.156.128:15672
- Overview 总览
- Connections 连接信息
- Channels 信道
- Exchange 交换机
- Queues 队列
- Admin 管理员
快速入门
概述
RabbitMQ简介
RabbitMQ运行机制
在可视化页面完成前期操作
点击Exchange创建交换机
Add a new exchange添加交换机
- Name 交换机名字
- Type 类型
- Durability 持久化 Durable为持久化
- Arguments 参数
添加
1 2 3
| name = cs.direct type = direct Durability = Durable cs.fanout type = fanout Durability = Durable cs.topic type = topic Durability = Durable
|
点击Queues创建消息队列
Add a new queue
- type 类型 默认 Classic
- Name 名字
- Durability 持久化 默认 Durable 开持久化
- Arguments 参数
添加
1 2 3 4 5
| cs cs.news cs.emps cscs.news 其他配置都选默认
|
将Queues绑定到交换机
点进去
1 2
| To queue = 队列名 给cs.direct cs.fanout 绑定刚刚创建的每个队列
|
给cs.topic绑定
topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列
需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
它同样也会识别两个通配符:符号 “#” 和符号 ”星号”。#匹配0个或多个单词,星号 匹配一个单词。
发个消息测试一下
点击交换机
输入Routing key以及其他参数就可以发布了
如我从cs.direct 中 发送消息 给 cs
SpringBoot整合RabbitMQ
导入依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
application.properties
1 2 3
| spring.rabbitmq.host=ip地址 spring.rabbitmq.username=root spring.rabbitmq.password=1234
|
编写测试类测试
1、direct类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
|
@Test public void direct() {
Map<String,Object> map = new HashMap<>(); map.put("msg","这是第2个消息"); map.put("data", Arrays.asList("helloworld",123,true));
rabbitTemplate.convertAndSend("exchange.direct","duxiu.news",new Book("admin","111")); }
|
点进相应队列查看消息
这里可以看到消息被序列化,默认使用的是 jdk的序列化,我们把它改为json
config包下创建
1 2 3 4 5 6 7 8 9
| @Configuration public class MyAMQPConfig {
@Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); }
}
|
接收
1 2 3 4 5 6 7 8 9
|
@Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("duxiu.news"); System.out.println(o.getClass()); System.out.println(o); }
|
2、广播 fanout
1 2 3 4 5 6 7
|
@Test public void sendMsg(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹")); }
|
其他的类似
监听消息队列内容
在启动类上添加注解
1 2 3 4
| /** 开启基于注解的RabbitMQ模式 **/ @EnableRabbit
|
在server实现类中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @Service public class BookService {
@RabbitListener(queues = "duxiu.news") public void receive(Book book){ System.out.println("收到消息:" + book); }
@RabbitListener(queues = "duxiu") public void receive02(Message message){ System.out.println(message.getBody()); System.out.println(message.getMessageProperties()); } }
|
效果
使用AmqpAdmin操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| @Autowired AmqpAdmin amqpAdmin;
@Test public void createExchange(){ amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); System.out.println("创建Exchange完成"); }
@Test public void createQueue(){ amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); System.out.println("创建Queue成功"); }
@Test public void createBinding(){ amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); }
|