您好,欢迎来到欧得旅游网。
搜索
您的当前位置:首页rabbitmq消息可靠性之消费者确认机制(ack机制)

rabbitmq消息可靠性之消费者确认机制(ack机制)

来源:欧得旅游网
rabbitmq消息可靠性之消费者确认机制(ack机制)

之前讲到rabbitmq提供了以下几种机制来保证消息可靠性,上篇介绍了生产者确认机制,本篇讲解消费者确认机制,消息持久化存储amqp默认就实现了,不用过多关注

  • 生产者确认机制

  • 消息持久化存储

  • 消费者确认机制

  • 失败重试机制

yml文件配置

spring:
  rabbitmq:
    host: 1.15.108.206
    port: 5672
    username: guest
    password: guest
    virtual-host: smallJHost
    # 消费者确认机制相关配置 
    # 开启publisher-confirm,
    # 这里支持两种类型:simple:同步等待confirm结果,直到超时;# correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback
    publisher-confirm-type: correlated
    # publish-returns:开启publish-return功能,同样是基于callback机制,不过是定义ReturnCallback
    publisher-returns: true
    # 定义消息路由失败时的策略。true,则调用ReturnCallback;false:则直接丢弃消息
    template:
      mandatory: true
    listener:
      simple:
        # ack机制类型
        acknowledge-mode: manual
        # 设置预取消息数量
        prefetch: 2

注意配置中的acknowledge-mode属性,这是设置ack机制的类型,但是如果是用**@RabbitListener注解实现的消费者,那么这里的设置是不会生效的,因为注解自身有ackMode = "AUTO"的默认值,所以在实现消费者的时候应该写明这个属性ackMode = “MANUAL”**

消费者示例:

package com.gitee.small.rabbitmq;

import com.rabbitmq.client.Channel;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
@Slf4j
public class RabbitReceiver {

    private static Integer index = 0;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.dog"),
            exchange = @Exchange(value = "binding.dog", type = ExchangeTypes.TOPIC)),
            ackMode = "MANUAL"
    )
    public void process(String msg, Channel channel, Message message) {
        try {
//            TimeUnit.SECONDS.sleep(3);
//        // 多个消费者可开启竞争模式
//        channel.basicQos(1);
            if (index % 2 == 0) {
                log.info("dog-收到消息-成功:{}", msg);
                channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
                index++;
            } else {
                log.info("dog-收到消息-失败:{}", msg + index);
                // 不批量处理,消费失败将消息重新投递回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
                index++;
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

消息确认

// 第一个参数是消息的唯一ID,第二个参数表示是否批量处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);

消息否认

// 第一个参数是消息的唯一ID,第二个参数表示是否批量处理,第三个参数表示是否将消息重发回队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);

因篇幅问题不能全部显示,请点此查看更多更全内容

Copyright © 2019- ovod.cn 版权所有

违法及侵权请联系:TEL:199 1889 7713 E-MAIL:2724546146@qq.com

本站由北京市万商天勤律师事务所王兴未律师提供法律服务