之前讲到rabbitmq提供了以下几种机制来保证消息可靠性,上篇介绍了生产者确认机制,本篇讲解消费者确认机制,消息持久化存储amqp默认就实现了,不用过多关注
生产者确认机制
消息持久化存储
消费者确认机制
失败重试机制
yml文件配置
spring:
rabbitmq:
host: 1.15.108.206
port: 5672
username: guest
password: guest
virtual-host: smallJHost
# 消费者确认机制相关配置
# 开启publisher-confirm,
# 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
publisher-confirm-type: correlated
# publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
publisher-returns: true
# 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
template:
mandatory: true
listener:
simple:
# ack机制类型
acknowledge-mode: manual
# 设置预取消息数量
prefetch: 2
注意配置中的acknowledge-mode属性,这是设置ack机制的类型,但是如果是用**@RabbitListener注解实现的消费者,那么这里的设置是不会生效的,因为注解自身有ackMode = "AUTO"的默认值,所以在实现消费者的时候应该写明这个属性ackMode = “MANUAL”**
消费者示例:
package com.gitee.small.rabbitmq;
import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
@Component
@Slf4j
public class RabbitReceiver {
private static Integer index = 0;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.dog"),
exchange = @Exchange(value = "binding.dog", type = ExchangeTypes.TOPIC)),
ackMode = "MANUAL"
)
public void process(String msg, Channel channel, Message message) {
try {
// TimeUnit.SECONDS.sleep(3);
// // 多个消费者可开启竞争模式
// channel.basicQos(1);
if (index % 2 == 0) {
log.info("dog-收到消息-成功:{}", msg);
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
index++;
} else {
log.info("dog-收到消息-失败:{}", msg + index);
// 不批量处理,消费失败将消息重新投递回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
index++;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息确认
// 第一个参数是消息的唯一ID,第二个参数表示是否批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
消息否认
// 第一个参数是消息的唯一ID,第二个参数表示是否批量处理,第三个参数表示是否将消息重发回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
因篇幅问题不能全部显示,请点此查看更多更全内容