Skip to content

Commit 167dff4

Browse files
committed
RabbitMQ的几种工作模型
1 parent f96f3c4 commit 167dff4

File tree

5 files changed

+334
-40
lines changed

5 files changed

+334
-40
lines changed
Lines changed: 56 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
package com.imzhizi.javalearning.DevKit.AMQP;
22

3-
import com.rabbitmq.client.Connection;
4-
import com.rabbitmq.client.ConnectionFactory;
3+
import com.rabbitmq.client.*;
4+
import lombok.SneakyThrows;
55

66
import java.io.IOException;
7+
import java.util.Arrays;
78
import java.util.concurrent.TimeoutException;
89

910
/**
@@ -13,7 +14,7 @@
1314
public class MQUtil {
1415
private static ConnectionFactory connectionFactory;
1516

16-
public static Connection getNewConnection() throws IOException, TimeoutException {
17+
public static Connection getNewConnection() {
1718
if (connectionFactory == null) {
1819
connectionFactory = new ConnectionFactory();
1920
connectionFactory.setHost("localhost");
@@ -23,6 +24,57 @@ public static Connection getNewConnection() throws IOException, TimeoutException
2324
connectionFactory.setPassword("just4fun");
2425
}
2526

26-
return connectionFactory.newConnection();
27+
Connection connection = null;
28+
try {
29+
connection = connectionFactory.newConnection();
30+
} catch (IOException | TimeoutException e) {
31+
e.printStackTrace();
32+
} finally {
33+
return connection;
34+
}
35+
}
36+
37+
public static Channel getNewChannel(Connection connection) {
38+
Channel channel = null;
39+
try {
40+
channel = connection.createChannel();
41+
} catch (IOException e) {
42+
e.printStackTrace();
43+
} finally {
44+
return channel;
45+
}
46+
}
47+
48+
public static void close(Connection connection, Channel channel) {
49+
try {
50+
channel.close();
51+
connection.close();
52+
} catch (IOException | TimeoutException e) {
53+
e.printStackTrace();
54+
}
55+
}
56+
57+
58+
public static void sleepConsuming(Channel channel, String queueName, String tag) throws IOException {
59+
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
60+
@SneakyThrows
61+
@Override
62+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
63+
System.out.println(tag + ":{" + new String(body)+"}");
64+
Thread.sleep(500);
65+
channel.basicAck(envelope.getDeliveryTag(), false);
66+
}
67+
});
68+
}
69+
70+
public static void consuming(Channel channel, String queueName, String tag) throws IOException {
71+
channel.basicConsume(queueName, false, new DefaultConsumer(channel) {
72+
@SneakyThrows
73+
@Override
74+
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
75+
System.out.println(tag + ":{" + new String(body)+"}");
76+
channel.basicAck(envelope.getDeliveryTag(), false);
77+
}
78+
});
2779
}
2880
}
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,19 @@
11
package com.imzhizi.javalearning.DevKit.AMQP;
22

3-
import com.rabbitmq.client.*;
4-
import lombok.SneakyThrows;
3+
import com.rabbitmq.client.Channel;
4+
import com.rabbitmq.client.Connection;
55

66
import java.io.IOException;
7-
import java.util.concurrent.TimeoutException;
87

98
/**
109
* created by zhizi
1110
* on 5/12/20 17:37
1211
*/
13-
public class WorkerQueue {
12+
public class 工作队列 {
1413
static class Producer {
15-
public static void main(String[] args) throws IOException, TimeoutException {
14+
public static void main(String[] args) throws IOException {
1615
Connection connection = MQUtil.getNewConnection();
17-
Channel channel = connection.createChannel();
16+
Channel channel = MQUtil.getNewChannel(connection);
1817
// 是否开启持久化
1918
// 是否独占模式
2019
// 消费后是否自动删除
@@ -26,20 +25,20 @@ public static void main(String[] args) throws IOException, TimeoutException {
2625
}
2726
channel.basicPublish("", "queue", null, new String("bye bye").getBytes());
2827
channel.basicPublish("", "queue", null, new String("bye bye").getBytes());
29-
channel.close();
30-
connection.close();
28+
MQUtil.close(connection, channel);
3129
}
3230
}
3331

3432
static class Consumer {
35-
public static void main(String[] args) throws IOException, TimeoutException {
33+
public static void main(String[] args) {
3634
Connection connection = MQUtil.getNewConnection();
3735
new Thread(() -> {
3836
try {
3937
Channel channel = connection.createChannel();
4038
channel.basicQos(1);
4139
channel.queueDeclare("queue", false, false, true, null);
42-
quickConsuming(channel);
40+
MQUtil.consuming(channel, "queue", "quick");
41+
4342
} catch (IOException e) {
4443
e.printStackTrace();
4544
}
@@ -49,37 +48,12 @@ public static void main(String[] args) throws IOException, TimeoutException {
4948
Channel channel = connection.createChannel();
5049
channel.basicQos(1);
5150
channel.queueDeclare("queue", false, false, true, null);
52-
slowConsuming(channel);
51+
MQUtil.sleepConsuming(channel, "queue", "slow");
5352
} catch (IOException e) {
5453
e.printStackTrace();
5554
}
5655
}).start();
5756
}
5857

59-
static void quickConsuming(Channel channel) throws IOException {
60-
channel.basicConsume("queue", false, new DefaultConsumer(channel) {
61-
@SneakyThrows
62-
@Override
63-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
64-
String s = new String(body);
65-
System.out.println("Quick " + s);
66-
Thread.sleep(500);
67-
channel.basicAck(envelope.getDeliveryTag(), false);
68-
}
69-
});
70-
}
71-
72-
static void slowConsuming(Channel channel) throws IOException {
73-
channel.basicConsume("queue", false, new DefaultConsumer(channel) {
74-
@SneakyThrows
75-
@Override
76-
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
77-
String s = new String(body);
78-
System.out.println("Slow " + s);
79-
Thread.sleep(1000);
80-
channel.basicAck(envelope.getDeliveryTag(), false);
81-
}
82-
});
83-
}
8458
}
8559
}
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package com.imzhizi.javalearning.DevKit.AMQP;
2+
3+
import com.rabbitmq.client.BuiltinExchangeType;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
7+
import java.io.IOException;
8+
9+
/**
10+
* created by zhizi
11+
* on 5/13/20 10:08
12+
*/
13+
public class 广播队列 {
14+
static class Provider {
15+
public static void main(String[] args) throws IOException {
16+
Connection connection = MQUtil.getNewConnection();
17+
Channel channel = MQUtil.getNewChannel(connection);
18+
19+
//exchange的名称和类型
20+
// 可以看到exchange发送的所有消息都被分发给了各个消费者
21+
// 设置 routingKey 似乎没啥用
22+
channel.exchangeDeclare("account", BuiltinExchangeType.FANOUT);
23+
channel.basicPublish("account", "", null, "verify your ".getBytes());
24+
channel.basicPublish("account", "", null, "verify your phone".getBytes());
25+
26+
MQUtil.close(connection, channel);
27+
}
28+
}
29+
30+
static class Consumer {
31+
public static void main(String[] args) {
32+
new Thread(() -> {
33+
Connection connection = MQUtil.getNewConnection();
34+
Channel channel = MQUtil.getNewChannel(connection);
35+
try {
36+
channel.exchangeDeclare("account", BuiltinExchangeType.FANOUT);
37+
// 创建并绑定临时队列
38+
String queueName = channel.queueDeclare().getQueue();
39+
channel.queueBind(queueName, "account", "");
40+
MQUtil.sleepConsuming(channel, queueName,"A");
41+
} catch (IOException e) {
42+
e.printStackTrace();
43+
}
44+
}).start();
45+
new Thread(() -> {
46+
Connection connection = MQUtil.getNewConnection();
47+
Channel channel = MQUtil.getNewChannel(connection);
48+
try {
49+
channel.exchangeDeclare("account", BuiltinExchangeType.FANOUT);
50+
// 创建并绑定临时队列
51+
String queueName = channel.queueDeclare().getQueue();
52+
channel.queueBind(queueName, "account", "");
53+
MQUtil.sleepConsuming(channel, queueName,"B");
54+
} catch (IOException e) {
55+
e.printStackTrace();
56+
}
57+
}).start();
58+
new Thread(() -> {
59+
Connection connection = MQUtil.getNewConnection();
60+
Channel channel = MQUtil.getNewChannel(connection);
61+
try {
62+
channel.exchangeDeclare("account", BuiltinExchangeType.FANOUT);
63+
// 创建并绑定临时队列
64+
String queueName = channel.queueDeclare().getQueue();
65+
channel.queueBind(queueName, "account", "");
66+
MQUtil.sleepConsuming(channel, queueName,"C");
67+
} catch (IOException e) {
68+
e.printStackTrace();
69+
}
70+
}).start();
71+
}
72+
73+
74+
}
75+
}
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
package com.imzhizi.javalearning.DevKit.AMQP;
2+
3+
import com.rabbitmq.client.BuiltinExchangeType;
4+
import com.rabbitmq.client.Channel;
5+
import com.rabbitmq.client.Connection;
6+
7+
import java.io.IOException;
8+
9+
/**
10+
* created by zhizi
11+
* on 5/13/20 15:12
12+
*/
13+
public class 路由动态 {
14+
static String exchange = "logs";
15+
static String levelDefault = "level.info";
16+
static String level0 = "level.info.*";
17+
static String levelInfo = "level.info.#";
18+
static String levelWarn = "level.info.warn.#";
19+
static String levelError = "level.info.warn.error.#";
20+
21+
static class Producer {
22+
public static void main(String[] args) throws IOException {
23+
Connection connection = MQUtil.getNewConnection();
24+
Channel channel = MQUtil.getNewChannel(connection);
25+
26+
// exchange的名称和类型, 路由模式
27+
// 这里使用了TOPIC类型的exchange
28+
// 重点就在于 levelKey 的设计,可以看到使用了通配符
29+
// 经过测试,default 消息能被 info 级别收到,也就是 # 能匹配0~n个word
30+
// 而 level0 能够接收 info 级别的消息,却接收不到 default 级别的消息,说明 * 能且仅必须匹配一个 word
31+
32+
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
33+
channel.basicPublish(exchange, levelDefault, null, "started".getBytes());
34+
channel.basicPublish(exchange, levelInfo, null, "ok!".getBytes());
35+
channel.basicPublish(exchange, levelWarn, null, "warn warn".getBytes());
36+
channel.basicPublish(exchange, levelError, null, "error error error".getBytes());
37+
38+
MQUtil.close(connection, channel);
39+
}
40+
}
41+
42+
static class Consumer {
43+
public static void main(String[] args) {
44+
new Thread(() -> {
45+
Connection connection = MQUtil.getNewConnection();
46+
Channel channel = MQUtil.getNewChannel(connection);
47+
try {
48+
// 创建exchange
49+
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
50+
// 创建临时队列
51+
String queueName = channel.queueDeclare().getQueue();
52+
// 创建并绑定临时队列
53+
channel.queueBind(queueName, exchange, levelInfo);
54+
MQUtil.consuming(channel, queueName, "A");
55+
56+
} catch (IOException e) {
57+
e.printStackTrace();
58+
}
59+
}).start();
60+
new Thread(() -> {
61+
Connection connection = MQUtil.getNewConnection();
62+
Channel channel = MQUtil.getNewChannel(connection);
63+
try {
64+
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
65+
String queueName = channel.queueDeclare().getQueue();
66+
channel.queueBind(queueName, exchange, levelWarn);
67+
MQUtil.consuming(channel, queueName, "B");
68+
69+
} catch (IOException e) {
70+
e.printStackTrace();
71+
}
72+
}).start();
73+
new Thread(() -> {
74+
Connection connection = MQUtil.getNewConnection();
75+
Channel channel = MQUtil.getNewChannel(connection);
76+
try {
77+
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
78+
String queueName = channel.queueDeclare().getQueue();
79+
channel.queueBind(queueName, exchange, levelError);
80+
MQUtil.consuming(channel, queueName, "C");
81+
} catch (IOException e) {
82+
e.printStackTrace();
83+
}
84+
}).start();
85+
new Thread(() -> {
86+
Connection connection = MQUtil.getNewConnection();
87+
Channel channel = MQUtil.getNewChannel(connection);
88+
try {
89+
channel.exchangeDeclare(exchange, BuiltinExchangeType.TOPIC);
90+
String queueName = channel.queueDeclare().getQueue();
91+
channel.queueBind(queueName, exchange, level0);
92+
MQUtil.consuming(channel, queueName, "D");
93+
} catch (IOException e) {
94+
e.printStackTrace();
95+
}
96+
}).start();
97+
}
98+
}
99+
}

0 commit comments

Comments
 (0)