前言

本文我们来学习下RabbitMQ的几种工作模式,通过具体的demo实战来体会下RabbitMQ的美妙之处。

工作队列模式

简介

工作队列模式(work queue):生产者发送消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。

代码实战

新建maven工程,添加依赖:

1
2
3
4
5
    <dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>

生产者核心代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
// ...省略配置uri的代码factory.setUri("amqp://root:123456@...:5672/%2f");

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

// 声明一个消息队列
channel.queueDeclare("queue.wq", true, false, false, null);
// 声明direct交换器
channel.exchangeDeclare("ex.wq", BuiltinExchangeType.DIRECT, true, false, null);
// 将消息队列绑定到指定的交换器,并指定绑定键
channel.queueBind("queue.wq", "ex.wq", "key.wq");

for (int i = 0; i < 15; i++) {
channel.basicPublish("ex.wq",
"key.wq", null,
("工作队列:" + i).getBytes("utf-8"));
}

channel.close();
connection.close();
}
}

在生产者服务中,我们设置了一个for循环,往消息队列里发布15次消息。

消费者核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//...省略配置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare("queue.wq", true,
false, false, null);

channel.basicConsume("queue.wq", new DeliverCallback() {
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("推送来的消息:" + new String(message.getBody(), "utf-8"));
}
}, new CancelCallback() {
public void handle(String consumerTag) throws IOException {
System.out.println("Cancel: " + consumerTag);
}
});


}
}

同时启动三个消费者服务,一个生产者服务。

在idea里面同时启动三个服务需做如下配置:

启动成功后,我们进入rabbitmq的管理界面,可以看到我们创建的队列及交换器信息:

执行结果

控制台执行效果如下:

可以看到,rabbitmq会将消息通过近似轮询的方式分发给不同的消费者,消费者会独立消费各自获取到的消息。

发布订阅模式

简介

在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。 生产者将消息发送给交换器。交换器的作用是,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。

发布订阅模式使用的是fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。

发布订阅模式是将消息广播给所有订阅该消息的消费者。如图所示:

代码实战

生产者核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//省略了设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
// 声明fanout类型的交换器
channel.exchangeDeclare("ex.myfan", "fanout", true, false, null);

for (int i = 0; i < 20; i++) {
channel.basicPublish("ex.myfan",
"", // fanout类型的交换器不需要指定路由键
null,
("hello world fan:" + i).getBytes("utf-8"));
}

channel.close();
connection.close();
}

消费者核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class OneConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//省略了设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

// 声明临时队列,队列的名字由RabbitMQ自动生成
final String queueName = channel.queueDeclare().getQueue();
System.out.println("生成的临时队列的名字为:" + queueName);

channel.exchangeDeclare("ex.myfan",
BuiltinExchangeType.FANOUT,
true,
false,
null);

// fanout类型的交换器绑定不需要routingkey
channel.queueBind(queueName, "ex.myfan", "");

channel.basicConsume(queueName, (consumerTag, message) -> {
System.out.println("One " + new String(message.getBody(), "utf-8"));
}, consumerTag -> {});

}
}

我们创建了一个名叫“ex.myfan”的fanout类型交换器,然后在消费者中将交换器与临时队列进行绑定。

在服务器上执行rabbitmqctl list_bindings --formatter pretty_table命令查看当前rabbitmq中的交换器与消息队列的绑定关系如下:

执行结果

此处我们复制三份消费者代码,模拟三个消费者服务进行测试,执行结果如下:

当我们将消费者客户端全部关闭之后,再在服务器上执行查看交换器和队列绑定关系信息的话,如下:

可以看到,队列都不在了,因为是临时的。

查看当前rabbitmq中交换器的状态:

交换器还在,因为是交换器是持久的。

注意:fanout类型的交换器的一个特点是类似于广播,当生产者发送消息的时候,如果消费者已经在线,那它能够收到消息,如果消费者当时不在线的话, 那么它在启动之后是收不到之前生产者发送的消息的。

路由模式

简介

使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。

现在我们想让接收者只接收部分消息,如,我们通过直接模式的交换器将关键的错误信息记录到log文件,同时在控制台正常打印所有的日志信息。

这就是我们要说的路由模式。

如图所示,我们在消费者一端定义各自所需要接受的消息队列的类型,分别和交换器进行绑定,实现不同消费者消费不同类型的消息。

代码实战

生产者核心代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class Producer {

private final static String[] LOG_LEVEL = {
"ERROR",
"FATAL",
"WARN"
};

private static Random random = new Random();

public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//此处省略设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

// 声明direct类型的交换器,交换器和消息队列的绑定不需要在这里处理
channel.exchangeDeclare("ex.routing", "direct", false, false, null);

for (int i = 0; i < 100; i++) {
String level = LOG_LEVEL[random.nextInt(100) % LOG_LEVEL.length];
channel.basicPublish("ex.routing", level, null, ("这是【" + level + "】的消息").getBytes());
}

}


}

消费者核心代码(此处只展示ErrorConsumer,FatalConsumer和WarnConsumer类似):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ErrorConsumer {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
//此处省略设置uri的代码 factory.setUri("amqp://root:123456@...:5672/%2f");

final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare("ex.routing", "direct", false, false, null);
// 此处也可以声明为临时消息队列
channel.queueDeclare("queue.error", false, false, false, null);

channel.queueBind("queue.error", "ex.routing", "ERROR");

channel.basicConsume("queue.error", ((consumerTag, message) -> {
System.out.println("ErrorConsumer收到的消息:" + new String(message.getBody(), "utf-8"));
}), consumerTag -> { });

}
}

查看交换器与消息队列的绑定关系:

队列信息:

执行结果

执行结果如下:

主题模式

简介

topic,主题模式。使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。

要想 topic 类型的交换器, routingKey 就不能随便写了,它必须得是点分单词。单词可以随便 写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该 点分单词字符串最长255字节。 bindingKey 也必须是这种形式。 topic 类型的交换器背后原理跟 direct 类型的类似:只要队列 的 bindingKey 的值与消息的 routingKey 匹配,队列就可以收到该消息。有两个不同:

* 匹配一个单词

#匹配0到多个单词

示例

如下图所示,我们发送描述动物的消息。消息发送的时候指定的 routingKey 包含了三个词,两个点。 第一个单词表示动物的速度,第二个是颜色,第三个是物种:<speed>.<color>.<species>。 :

创建三个绑定:

Q1绑定到" *.orange.* "

Q2绑定到" *.*.rabbit "和" lazy.# "。

描述:

  1. Q1关注orange颜色动物的消息
  2. Q2关注兔子的消息,以及所有懒的动物消息
  3. 如果不能匹配,就丢弃消息。
  4. 如果发送的消息 routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定。

代码实战

生产者核心代码如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class Producer {

private static final String[] LOG_LEVEL = {"info", "error", "warn"};
private static final String[] LOG_AREA = {"beijing", "shanghai", "shenzhen"};
private static final String[] LOG_BIZ = {"edu-online", "biz-online", "emp-online"};

private static final Random RANDOM = new Random();

public static void main(String[] args) throws Exception {

final ConnectionFactory factory = new ConnectionFactory();
//省略uri设置代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.exchangeDeclare("ex.topic", "topic", true, false, null);

String area, level, biz;

String routingKey, message;
for (int i = 0; i < 100; i++) {

area = LOG_AREA[RANDOM.nextInt(LOG_AREA.length)];
level = LOG_LEVEL[RANDOM.nextInt(LOG_LEVEL.length)];
biz = LOG_BIZ[RANDOM.nextInt(LOG_BIZ.length)];

// routingKey中包含了三个维度
routingKey = area + "." + biz + "." + level;
message = "LOG: [" + level + "] :这是 [" + area + "] 地区 [" + biz + "] 服务器发来的消息,MSG_SEQ = " + i;

channel.basicPublish("ex.topic", routingKey, null, message.getBytes("utf-8"));
}

channel.close();
connection.close();
}
}

消费者代码(部分)如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class BeijingConsumer {
public static void main(String[] args) throws Exception {
final ConnectionFactory factory = new ConnectionFactory();
//省略uri设置代码 factory.setUri("amqp://root:123456@...:5672/%2f");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();

// 临时队列,返回值是服务器为该队列生成的名称
final String queue = channel.queueDeclare().getQueue();
channel.exchangeDeclare("ex.topic", "topic", true, false, null);
// beijing.biz-online.error
// 只要routingKey是以beijing开头的,后面不管几个点分单词,都可以接收
channel.queueBind(queue, "ex.topic", "beijing.#");

channel.basicConsume(queue, (consumerTag, message) -> {
System.out.println(new String(message.getBody(), "utf-8"));
}, consumerTag -> {});

}
}

执行结果

源码

源码地址: 码云demo地址

总结

我们主要学习了RabbitMQ的四种工作模式,分别是工作队列模式、发布订阅模式、路由模式、主题模式,他们各自有各自的特点及使用场景。