基于SpringBoot和RabbitMQ实现消息队列 | Java提升营

基于SpringBoot和RabbitMQ实现消息队列

概述

RabbitMQ是实现AMQP(高级消息队列协议)消息中间件的一种,主要是为了实现系统之间的双向解耦。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层保存这个数据。

AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

交换机模式有4中,Direct:”先匹配, 再投送”;Topic:按规则转发消息;Headers:设置header attribute参数类型;Fanout:转发消息到所有绑定队列。后面会逐个介绍。

RabbitMQ安装配置

Mac下安装

测试Mac系统版本为:macOS High Sierra 10.13.3,安装方式使用homebrew。如果之前已经安装过homebrew,但是很长时间没有使用了,建议卸载掉之后重装,因为可能因为系统升级原因造成不可用。
卸载命令:

1
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/uninstall)"

安装命令直接从 https://brew.sh/ 获取 ,可能会有变化,目前的是:

1
/usr/bin/ruby -e "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/master/install)"

然后一行命令即可完成安装:

1
brew install rabbitmq

我安装的是3.7.4版本,启动方式:

1
2
cd /usr/local/Cellar/rabbitmq/3.7.4/sbin/
./rabbitmq-server start

启动完成后,浏览器输入 http://localhost:15672 查看是否成功,账号密码都是guest。必须是localhost,ip地址访问的后面有介绍。

Ubuntu下安装

理论情况下,可以直接使用sudo apt-get install rabbitmq-server安装,但是在安装rabbitmq的时候,会自动安装erlang,部分情况下会造成没有匹配rabbitmq最新版本的erlang,也就直接报错了。我的方法是不安装最新版本的rabbitmq,erlang较低版本是存在的。
查看可安装的rabbitmq版本

1
2
# 如果没有安装apt-cache需要先进行安装
apt-cache policy rabbitmq-server

从版本中选一个非最新版本的,指定版本安装

1
sudo apt-get install rabbitmq-server=上面列出来的版本

安装完成后启动

1
2
sudo rabbitmq-server start
# 服务方式启动使用 service rabbitmq-server start

如果部署机器不是本机,建议在中断使用curl方式查看是否是404即可

1
curl http://localhost:15672

远程访问配置

rabbitmq为了安全起见,模式情况下只能本机访问。同时,即便开启了远程访问,也不能使用guest账户。下面实现用自己配置的账号密码在远程访问。

  • 新增一个用户(因为guest只能localhost访问)
1
2
# rabbitmqctl add_user Username Password
sudo rabbitmqctl add_user fymod 123456
  • 删除一个用户(必要情况下使用)
1
2
# rabbitmqctl delete_user Username
sudo rabbitmqctl delete_user fymod
  • 修改用户密码(必要情况下使用)
1
2
#rabbitmqctl change_password Username Newpassword
sudo rabbitmqctl change_password fymod 1234567
  • 查看当前用户列表
1
rabbitmqctl list_users
  • 设置用户角色(必须)
  1. 超级管理员(administrator):可登录管理控制台(启用management plugin的情况下),可查看所有的信息,并且可以对用户,策略(policy)进行操作。
  2. 监控者(monitoring): 可登录管理控制台(启用management plugin的情况下),同时可以查看rabbitmq节点的相关信息(进程数,内存使用情况,磁盘使用情况等)
  3. 策略制定者(policymaker): 可登录管理控制台(启用management plugin的情况下), 同时可以对policy进行管理。但无法查看节点的相关信息(上图红框标识的部分)。与administrator的对比,administrator能看到这些内容
  4. 普通管理者(management): 仅可登录管理控制台(启用management plugin的情况下),无法看到节点信息,也无法对策略进行管理。
  5. 其他:无法登录管理控制台,通常就是普通的生产者和消费者。
1
2
3
4
# rabbitmqctl set_user_tags User Tag
sudo rabbitmqctl set_user_tags fymod administrator
# 也可以个同一用户设置多个角色
# rabbitmqctl set_user_tags fymod monitoring policymaker
  • 设置用户权限(必须)
1
2
# rabbitmqctl set_permissions -p VHostPath User ConfP WriteP ReadP
eg. rabbitmqctl set_permissions -p / fymod ".*" ".*" ".*"
  • 查看(指定hostpath)所有用户的权限信息(必要情况下使用)
1
rabbitmqctl list_permissions [-p VHostPath]
  • 查看指定用户的权限信息
1
rabbitmqctl list_user_permissions fymod
  • 清除用户的权限信息(必要情况下使用)
1
rabbitmqctl clear_permissions [-p VHostPath] fymod
  • 安装插件
    不安装的话不能实现远程网页访问
    开启某个插件: rabbitmq-plugins enable {插件名}
    关闭某个插件: rabbitmq-plugins disable {插件名}
    有关插件名可以在rabbitmq的安装目录下的plugins目录中查看:$RABBITMQ_HOME/plugins
1
2
3
4
# 启用插件
sudo rabbitmq-plugins enable rabbitmq_management
# 查看插件列表
sudo rabbitmq-plugins list

至此,完成了全部配置,使用 http://IP地址:15672 访问,输入配置好的用户名和密码接口。

新建项目

新建springboot项目的时候,勾选RabbitMQ即可,在pom.xml中会自动加上

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后修改application.yml,内容为(根据自己信息修改)

1
2
3
4
5
6
7
8
spring:
application:
name: springboot-rabbitmq
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest

Direct

Direct Exchange是RabbitMQ默认的交换机模式,也是最简单的模式,根据key全文匹配去寻找队列。

队列配置

1
2
3
4
5
6
7
8
9
10
11
12
13
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class DirectRabbitConfig {

@Bean
public Queue queue() {
return new Queue("test1");
}

}

配置比较简单,test1就是在rabbitmq后台可以看到的队列名称,如果有多个消息队列,可以写上多个方法,比如再增加一个queue2方法来添加队列test2等。

发送实体类

如果发送的是字符串,就不要这个了。此处定义一个Student类作为发送内容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import java.io.Serializable;

public class Student implements Serializable {

private static final long serialVersionUID = 1L;

public Student(int number, String name) {
this.number = number;
this.name = name;
}

private int number;

private String name;

public int getNumber() {
return number;
}

public void setNumber(int number) {
this.number = number;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "Student [number=" + number + ", name=" + name + "]";
}

}

发送者

rabbitTemplate是springboot提供的默认实现。调用convertAndSend发送时,第一个参数必须和配置类中的一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
import com.fymod.rabbitmq.domain.Student;

@Component
public class DirectSender {

@Autowired private AmqpTemplate rabbitTemplate;

public void send(Student student) {
System.out.println("send:" + student.getNumber());
this.rabbitTemplate.convertAndSend("test1", student);
}

}

为了方便模拟有多个发送者发送到test1上(真实场景可能会用到),再继续写一个DirectSender2,代码内容基本一致。当然,如果只是为了看效果,可以不加。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.fymod.rabbitmq.domain.Student;

@Component
public class DirectSender2 {

@Autowired private AmqpTemplate rabbitTemplate;

public void send(Student student) {
System.out.println("send2:" + student.getNumber());
this.rabbitTemplate.convertAndSend("test1", student);
}

}

接收者

接受者需要声明注解@RabbitListener并且参数名称必须和配置类中的一致,本篇为test1。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import com.fymod.rabbitmq.domain.Student;

@Component
@RabbitListener(queues = "test1")
public class DirectReceive {

@RabbitHandler
public void process(Student student) {
System.out.println("receive:" + student.getName());
}

}

和有两个发送者类似,再写一个接受者来模拟有多个接受者的情况。可以根据具体需要选择。

1
2
3
4
5
6
7
8
9
10
11
12
import com.fymod.rabbitmq.domain.Student;

@Component
@RabbitListener(queues = "test1")
public class DirectReceive2 {

@RabbitHandler
public void process(Student student) {
System.out.println("receive2:" + student.getName());
}

}

测试

在测试类中,发送消息并且接收。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import java.util.ArrayList;
import java.util.List;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

import com.fymod.rabbitmq.direct.DirectSender;
import com.fymod.rabbitmq.direct.DirectSender2;
import com.fymod.rabbitmq.domain.Student;

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringBootRabbitmqApplicationTests {

@Autowired private DirectSender directSender;
@Autowired private DirectSender2 directSender2;

@Test
public void direct() {
List<Student> students = new ArrayList<>();
for(int i = 0; i < 10; i++) {
Student student = new Student(10000 + i, "姓名" + (i+1));
students.add(student);
}
for(int i = 0; i < students.size(); i++) {
directSender.send(students.get(i));
directSender2.send(students.get(i));
}
}

}

直接运行Test中的direct方法,可以查看到消息交替发送和接收。

Topic

Topic主要是根据通配符,队列和交换机的绑定会定义一种路由模式,那么,通配符就要在这种路由模式和路由键之间匹配后交换机才能转发消息。
和Direct类似,convertAndSend参数增加为3个,第二个为通配符,使用英文点(.)分割,通配符中星号*表示一个单词,井号#表示零个或多个单词。

队列配置

使用了通配符,实现如果是message,那么转发到test2.message和test2.messages(通配符test2.#可以接收test2.message和test2.messages);如果是message2,那么只转发到test2.messages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class TopicRabbitConfig {

final static String message = "test2.message";
final static String messages = "test2.messages";

@Bean
public Queue testMessage() {
return new Queue(TopicRabbitConfig.message);
}

@Bean
public Queue testMessages() {
return new Queue(TopicRabbitConfig.messages);
}

@Bean
TopicExchange exchange() {
return new TopicExchange("exchange");
}

@Bean
Binding bindingExchangeMessage(Queue testMessage, TopicExchange exchange) {
return BindingBuilder.bind(testMessage).to(exchange).with("test2.message");
}

@Bean
Binding bindingExchangeMessages(Queue testMessages, TopicExchange exchange) {
return BindingBuilder.bind(testMessages).to(exchange).with("test2.#");
}

}

发送者

方便起见,直接发送String类型数据(实体类的话可以参照Dircect)。发送方法第二个参数必须和配置中的一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicSender {

@Autowired private AmqpTemplate rabbitTemplate;

public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "test2.message", context);
}

public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("exchange", "test2.messages", context);
}

}

接收者

本篇需要两个接收者,RabbitListener参数分别对应test2.message和test2.messages。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "test2.message")
public class TopicReceive {

@RabbitHandler
public void process(String content) {
System.out.println("receive:" + content);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "test2.messages")
public class TopicReceive2 {

@RabbitHandler
public void process(String content) {
System.out.println("receive2:" + content);
}

}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
	@Autowired TopicSender topicSender;

@Test
public void topic() {
// 发送send2,两个接受者都能收到消息
for(int i = 0; i < 10; i++) {
topicSender.send1();
}
// 发送send2,只有接受者2能收到
// for(int i = 0; i < 10; i++) {
// topicSender.send2();
// }
}

Headers

Headers也是根据规则匹配,相较于Direct和topic固定地使用 routing_key,Headers则是一个自定义匹配规则的类型。
在队列与交换器绑定时,会设定一组键值对规则, 消息中也包括一组键值对(Headers属性),当这些键值对有一对或全部匹配时, 消息被投送到对应队列。
使用场景很有限,不再提供代码。

Fanout

Fanout消息广播的模式,不管路由键或者是路由模式,会把消息发给绑定给它的全部队列,如果配置了routing_key会被忽略。简单说就是绑定了当前交换机的所有队列都会收到消息。

队列配置

使用了A、B、C三个队列绑定到Fanout交换机上面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class FanoutRabbitConfig {

@Bean
public Queue aMessage() {
return new Queue("fanout.A");
}

@Bean
public Queue bMessage() {
return new Queue("fanout.B");
}

@Bean
public Queue cMessage() {
return new Queue("fanout.C");
}

@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("fanoutExchange");
}

@Bean
Binding bindingExchangeA(Queue aMessage,FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bMessage).to(fanoutExchange);
}

@Bean
Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cMessage).to(fanoutExchange);
}

}

发送者

发送端的routing_key写任何字符都会被忽略,这里使用空的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class FanoutSender {

@Autowired private AmqpTemplate rabbitTemplate;

public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("fanoutExchange","", context);
}

}

接收者

定义三个接收者,分别接收ABC的消息,理论情况下,一个发送,三个都能接收到。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.A")
public class FanoutReceiveA {

@RabbitHandler
public void process(String content) {
System.out.println("receive A:" + content);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.B")
public class FanoutReceiveB {

@RabbitHandler
public void process(String content) {
System.out.println("receive B:" + content);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = "fanout.C")
public class FanoutReceiveC {

@RabbitHandler
public void process(String content) {
System.out.println("receive C:" + content);
}

}

测试

1
2
3
4
5
6
@Autowired FanoutSender fanoutSender;

@Test
public void fanout() {
fanoutSender.send();
}

会发现三个接收者都收到了消息。

给老奴加个鸡腿吧 🍨.