avatar

RabbitMQ

简介

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

作用

  • 异步处理
  • 应用解耦
  • 流量削峰
  • 日志处理
  • 等等等等等。。。。。。

安装与配置

使用Docker安装

拉取镜像

1
2
#指定版本,该版本包含了web控制页面
docker pull rabbitmq:management

运行

1
2
3
4
5
#方式一:默认guest 用户,密码也是 guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management

#方式二:设置用户名和密码
docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=root -e RABBITMQ_DEFAULT_PASS=1234 -p 15672:15672 -p 5672:5672 rabbitmq:management

启动成功后浏览器访问http://192.168.156.128:15672

  • Overview 总览
  • Connections 连接信息
  • Channels 信道
  • Exchange 交换机
  • Queues 队列
  • Admin 管理员

快速入门

概述

RabbitMQ简介

RabbitMQ运行机制

在可视化页面完成前期操作

点击Exchange创建交换机

Add a new exchange添加交换机
  • Name 交换机名字
  • Type 类型
  • Durability 持久化 Durable为持久化
  • Arguments 参数

添加

1
2
3
name = cs.direct type = direct Durability = Durable
cs.fanout type = fanout Durability = Durable
cs.topic type = topic Durability = Durable

点击Queues创建消息队列

Add a new queue
  • type 类型 默认 Classic
  • Name 名字
  • Durability 持久化 默认 Durable 开持久化
  • Arguments 参数

添加

1
2
3
4
5
cs
cs.news
cs.emps
cscs.news
其他配置都选默认

将Queues绑定到交换机

点进去

1
2
To queue = 队列名
给cs.direct cs.fanout 绑定刚刚创建的每个队列

给cs.topic绑定

topic交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列
需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。
它同样也会识别两个通配符:符号 “#” 和符号 ”星号”。#匹配0个或多个单词,星号 匹配一个单词。

发个消息测试一下

点击交换机

输入Routing key以及其他参数就可以发布了

如我从cs.direct 中 发送消息 给 cs

SpringBoot整合RabbitMQ

导入依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.properties

1
2
3
spring.rabbitmq.host=ip地址
spring.rabbitmq.username=root
spring.rabbitmq.password=1234

编写测试类测试

1、direct类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
/**
* 1、单播(点对点)也就是 direct
*/
@Test
public void direct() {
//Message需要自己构造一个,定义消息体内容和消息头
//rabbitTemplate.send(exchage, routeKey, message);

// object默认当成消息体,只需要传入发送对象,自动序列化发送给rabbitmq
//rabbitTemplate.convertAndSend(exchange,routeKey,object);

Map<String,Object> map = new HashMap<>();
map.put("msg","这是第2个消息");
map.put("data", Arrays.asList("helloworld",123,true));

//对象被默认序列化后发送出去
rabbitTemplate.convertAndSend("exchange.direct","duxiu.news",new Book("admin","111"));
}

点进相应队列查看消息

这里可以看到消息被序列化,默认使用的是 jdk的序列化,我们把它改为json

config包下创建

1
2
3
4
5
6
7
8
9
@Configuration
public class MyAMQPConfig {

@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}

}

接收

1
2
3
4
5
6
7
8
9
/**
* 接收
*/
@Test
public void receive(){
Object o = rabbitTemplate.receiveAndConvert("duxiu.news");
System.out.println(o.getClass());
System.out.println(o);
}

2、广播 fanout

1
2
3
4
5
6
7
/**
* 广播
*/
@Test
public void sendMsg(){
rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹"));
}

其他的类似

监听消息队列内容

在启动类上添加注解

1
2
3
4
/**
开启基于注解的RabbitMQ模式
**/
@EnableRabbit

在server实现类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Service
public class BookService {

/**
* 监听消息队列的内容
* @param book
*/
@RabbitListener(queues = "duxiu.news")
public void receive(Book book){
System.out.println("收到消息:" + book);
}

@RabbitListener(queues = "duxiu")
public void receive02(Message message){
//接收消息头信息
System.out.println(message.getBody());
System.out.println(message.getMessageProperties());
}
}

效果

使用AmqpAdmin操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Autowired
AmqpAdmin amqpAdmin;

/**
* 创建交换机Exchange
*/
@Test
public void createExchange(){
amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange"));
System.out.println("创建Exchange完成");
}


/**
* 创建队列 Queue
*/
@Test
public void createQueue(){
amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true));
System.out.println("创建Queue成功");
}

/**
* 创建绑定规则
*/
@Test
public void createBinding(){
amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null));
}
文章作者:
文章链接: https://huohuohuohuohuohuo.github.io/2020/03/19/RabbitMQ/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
打赏
  • 微信
    微信
  • 支付寶
    支付寶

评论