Skip to content

Rabbitmq点赞改为阻塞式消费 #104

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,18 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import javax.annotation.PreDestroy;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

@Slf4j
@Service
public class RabbitmqServiceImpl implements RabbitmqService {

// 设置一个消费者的固定连接,从池中获取一个连接即可
private RabbitmqConnection rabbitmqConsumerConnection;
private Channel rabbitmqConsumerChannel;

@Autowired
private NotifyService notifyService;

Expand Down Expand Up @@ -59,66 +64,66 @@ public void publishMsg(String exchange,

}

/**
* 阻塞式消费
* @param exchange
* @param queueName
* @param routingKey
*/
@Override
public void consumerMsg(String exchange,
String queueName,
String routingKey) {

try {
//创建连接
RabbitmqConnection rabbitmqConnection = RabbitmqConnectionPool.getConnection();
Connection connection = rabbitmqConnection.getConnection();
rabbitmqConsumerConnection = RabbitmqConnectionPool.getConnection();
Connection connection = rabbitmqConsumerConnection.getConnection();
//创建消息信道
final Channel channel = connection.createChannel();
rabbitmqConsumerChannel = connection.createChannel();
//消息队列
channel.queueDeclare(queueName, true, false, false, null);
rabbitmqConsumerChannel.queueDeclare(queueName, true, false, false, null);
//绑定队列到交换机
channel.queueBind(queueName, exchange, routingKey);
rabbitmqConsumerChannel.queueBind(queueName, exchange, routingKey);

Consumer consumer = new DefaultConsumer(channel) {
Consumer consumer = new DefaultConsumer(rabbitmqConsumerChannel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
log.info("Consumer msg: {}", message);

// 获取Rabbitmq消息,并保存到DB
// 说明:这里仅作为示例,如果有多种类型的消息,可以根据消息判定,简单的用 if...else 处理,复杂的用工厂 + 策略模式
notifyService.saveArticleNotify(JsonUtil.toObj(message, UserFootDO.class), NotifyTypeEnum.PRAISE);

channel.basicAck(envelope.getDeliveryTag(), false);
rabbitmqConsumerChannel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 取消自动ack
channel.basicConsume(queueName, false, consumer);
channel.close();
RabbitmqConnectionPool.returnConnection(rabbitmqConnection);
} catch (InterruptedException | IOException | TimeoutException e) {
// 取消自动ack, 自动监听消息
rabbitmqConsumerChannel.basicConsume(queueName, false, consumer);
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}

@Override
public void processConsumerMsg() {
log.info("Begin to processConsumerMsg.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE, CommonConstants.QUERE_KEY_PRAISE);
}

Integer stepTotal = 1;
Integer step = 0;

// TODO: 这种方式非常 Low,后续会改造成阻塞 I/O 模式
while (true) {
step++;
try {
log.info("processConsumerMsg cycle.");
consumerMsg(CommonConstants.EXCHANGE_NAME_DIRECT, CommonConstants.QUERE_NAME_PRAISE,
CommonConstants.QUERE_KEY_PRAISE);
if (step.equals(stepTotal)) {
Thread.sleep(10000);
step = 0;
}
} catch (Exception e) {

/**
* 关闭连接和通道,销毁时关闭并归还
*/
@PreDestroy
public void destroy() {
try {
if (rabbitmqConsumerChannel != null && rabbitmqConsumerChannel.isOpen()) {
rabbitmqConsumerChannel.close();
}
if (rabbitmqConsumerConnection != null) {
RabbitmqConnectionPool.returnConnection(rabbitmqConsumerConnection);
}
} catch (IOException | TimeoutException e) {
log.error("关闭 RabbitMQ 连接和通道时出错", e);
}
}
}