RabbitMQ之消息丢失和重复消费解决方案
前面我们聊过RabbitMQ入门基础用法,在实际工作中,我们不仅需要懂得在实际工作场景中使用消息队列,还需要懂得对消息队列可能存在的各种异常进行有效的预防及处理。比如:网络异常、消息丢失、重复消费等等。下面我们就针对可能出现的不同问题进行说明及预防和处理
客户端重连
当网络不稳定的时候,可能会导致客户端与MQ服务端连接失败,此时我们就需要让配置客户端重连机制,当客户端发现连接失败时自动发起重连,确保消息的正常发送
虽然利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重连机制是阻塞式重试,可能会影响业务性能。该功能默认是关闭状态,如果使用该配置,须合理配置等待时常和重试次数
spring:
rabbitmq:
# MQ连接超时时间
connection-timeout: 100ms
template:
retry:
# 开启重试机制
enabled: true
# 失败后的初始等待时间
initial-interval: 1000ms
# 败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
multiplier: 1
# 最大重试次数
max-attempts: 3
消息确认机制
除了网络抖动导致连接失败的情况,还可能出现各种意外导致消息丢失。比如:生产者发送消息过程中意外丢失、MQ故障导致消息丢失、消费者消费异常导致消息丢失等等,这些场景我们都需要有所考虑
生产者消息确认
RabbitMQ 提供了 Publisher Confirm
和 Publisher Return
两种确认机制。开启确认机制后,在MQ成功收到消息后会返回确认消息给消息生产者
spring:
rabbitmq:
# 配置 publisher-confirm 类型
publisher-confirm-type: correlated
publisher-confirm-type
有三种模式
- none:默认,关闭confirm机制
- simple:同步阻塞等待MQ确认
- correlated:异步回调确认机制
当我们发送消息时,我们可以为消息指定消息ID、消息ConfirmCallback,并针对消息异常进行处理
@Test
public void sendDirectMessage() throws IOException {
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(result -> {
if (result.isAck()){
log.info("消息发送成功,收到ACK");
} else {
// 触发重试机制
log.error("收到NACK,消息发送失败,失败原因:{}", result.getReason());
}
}, e -> log.error("消息回调失败", e));
// 正常流程基本不会异常
rabbitTemplate.convertAndSend("direct.exchange", "direct.key", "direct message1");
}
当我们设置一个不存在的交换机并发送消息时,得到回调通知如下:
Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'direct.exchange1' in vhost '/', class-id=60, method-id=40)
可以看到回调通知内告知交换机不存在,这个一般我们情况下可能性比较小,需要我们在项目部署后确认上线需要配置的交换机是否正确建立或者是否代码有误
当我们设置一个不存在的路由规则并发送消息时,得到回调通知如下:
收到消息的return callback message:(Body:'direct message1' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode:312, replyText:NO_ROUTE, exchange:direct.exchange, routingKey:direct.key11
同样,我们需要对我们设置的路由规则进行验证,避免代码失误造成线上BUG
消费者消息确认
- 为了确认消费者成功消费,RabbitMQ 提供了消费者消息确认机制(Consumer Acknowledgment)。消费者在成功将消息处理完成后,应主动向MQ发送确认消息,告知MQ该消息消费情况
- ack:消费成功,MQ删除该消息
- nack:消费失败,MQ触发重新消费
- reject:消费失败且拒绝再次消费该消息,MQ删除该消息
当生产者将消息投递到MQ后,消费者开始对消息进行消费,如果在消费过程中因为系统异常等原因导致消费失败,这就可能导致MQ疯狂重复投递,可能导致我们的服务宕机,因为默认情况下SpringAMQP自动帮我们进行了ACK逻辑
@Slf4j @Component public class RabbitMQListener implements Serializable { private static final long serialVersionUID = 9128812682227124956L; @RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { log.info("MQ基本消费队列消费者收到消息:{}", msg); int i = 1/0; } }
从日志上可以看到,当消费抛出异常后,MQ会一直将消息投递给消费者,如果我们抛出MessageConversionException
异常,则该消息将会被拒绝丢弃@RabbitListener(queues = "simple.queue") public void listenSimpleQueueMessage(String msg) { log.info("MQ基本消费队列消费者收到消息:{}", msg); throw new MessageConversionException("test"); }
RabbitMQ中给我们提供了以下三个的ACK配置项:
- none:消费者收到消息立即ACK,不管最终消费结果,消息易丢失
- manual:手动模式,需要在业务代码中调用API返回ACK,相对更灵活
- auto:自动模式,SpringAMQP利用AOP技术帮我们处理了ACK逻辑
如果使用默认
auto
模式,我们应该对消息重试次数加以限制,避免消息无法消费而给系统带来不必要的压力spring: rabbitmq: listener: simple: retry: enabled: true # 开启失败重试 initial-interval: 1s # 初始失败等待时间 multiplier: 2 # 重试间隔乘数,表示每次重试的间隔时间是前一次的倍数。下次等待时长 = initial-interval * multiplier max-attempts: 3 # 最大重试次数 stateless: true # true无状态,false有状态,如果业务包含事务,应设为false max-interval: 10s # 最大间隔时间
- 在限制重试次数后,如果在重试次数耗尽后消息依然失败,默认情况下就会将消息丢弃,当然我们也可以自定义重试失败后消息处理策略
RejectAndDontRequeueRecoverer
:重试耗尽后,直接将消息丢弃ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将消息投递到指定交换机
示例:
@Configuration
@ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true")
public class ErrorConfiguration implements Serializable {
private static final long serialVersionUID = 3902859475226211634L;
@Bean
public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate){
return new RepublishMessageRecoverer(rabbitTemplate, "error.exchange", "error");
}
}
@Slf4j
@Component
public class RabbitMQListener implements Serializable {
private static final long serialVersionUID = 9128812682227124956L;
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "error.queue"),
exchange = @Exchange(name = "error.exchange"),
key = "error"
))
public void listenErrorQueueMessage(String msg) {
log.info("MQ异常消息消费者收到消息:{}", msg);
}
}
MQ消息持久化
在默认情况下,RabbitMQ会将消息保存在内存中,这样可以提高MQ消息处理速度,但可能会导致以下问题:
- 1.MQ宕机导致内存数据丢失
2.消息消费过慢,导致消息积压,造成MQ阻塞
当然,我们在Spring中使用RabbitMQ, Spring默认会将交换机、队列及消息都设为持久化,从而帮我们避免了这个问题,这里我们需要知道这个问题的存在即可
- 这里我们分别发送持久化消息、临时消息、以及开启ACK确认消息各10万条,记录一下消息发送的耗时:
@Test
public void sendDirectMessage1() {
long begin = System.currentTimeMillis();
Message message = MessageBuilder.withBody("direct message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
for (int index = 0; index < 100000; index++){
rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", message);
}
log.info("sendDirectMessage1 end {}", System.currentTimeMillis() - begin);
}
@Test
public void sendDirectMessage2() {
long begin = System.currentTimeMillis();
Message message = MessageBuilder.withBody("direct message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
for (int index = 0; index < 100000; index++){
rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", message);
}
log.info("sendDirectMessage2 end {}", System.currentTimeMillis() - begin);
}
@Test
public void sendDirectMessage3() {
long begin = System.currentTimeMillis();
Message message = MessageBuilder.withBody("direct message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.build();
for (int index = 0; index < 100000; index++){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(result -> {
if (result.isAck()){
log.info("消息发送成功,收到ACK");
} else {
log.error("收到NACK,消息发送失败,失败原因:{}", result.getReason());
}
}, e -> log.error("消息回调失败", e));
rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", message, correlationData);
}
log.info("sendDirectMessage3 end {}", System.currentTimeMillis() - begin);
}
@Test
public void sendDirectMessage4() {
long begin = System.currentTimeMillis();
Message message = MessageBuilder.withBody("direct message".getBytes(StandardCharsets.UTF_8))
.setDeliveryMode(MessageDeliveryMode.PERSISTENT)
.build();
for (int index = 0; index < 100000; index++){
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
correlationData.getFuture().addCallback(result -> {
if (result.isAck()){
log.info("消息发送成功,收到ACK");
} else {
log.error("收到NACK,消息发送失败,失败原因:{}", result.getReason());
}
}, e -> log.error("消息回调失败", e));
rabbitTemplate.convertAndSend("direct.exchange", "direct.key1", message, correlationData);
}
log.info("sendDirectMessage4 end {}", System.currentTimeMillis() - begin);
}
得到日志如下:
我们可以看到,临时消息发送耗时最低,而ACK耗时最高,在我们系统设计的时候,我们应该综合考量业务等相关因素为我们系统设计合适的配置
惰性队列
- 我们知道默认情况下,RabbitMQ会将消息存在内存中,即使开启了持久化,也会在写入磁盘的同时在内存中留一份备份
- RabbitMQ从3.6.0版本开始增加了惰性队列(Lazy Queue),3.12版本后所有队列都默认是惰性队列
- 惰性队列会尽可能将消息存入磁盘而非内存
在消费者消费相应消息时才会将消息加载到内存,这种设计可以让我们MQ存储更多消息 - 当遇到消费者异常(宕机、系统阻塞消费慢等)会导致消息堆积时,非惰性队列可能会因为内存占用过高导致MQ阻塞,而惰性队列因为很少占用内存很好的避免了这个问题
下面给出惰性队列的使用方式
1.声明队列和交换机的绑定关系@Configuration public class MqConfig implements Serializable { private static final long serialVersionUID = 2907977050485991754L; @Bean public DirectExchange exchange(){ return new DirectExchange("lazy.direct.exchange"); } @Bean public Queue queue(){ Map<String, Object> map = new HashMap<>(1); map.put("x-queue-mode", "lazy"); return QueueBuilder.durable("lazy.direct.queue").withArguments(map).build(); } @Bean public Binding binding(Queue queue, DirectExchange exchange){ return BindingBuilder.bind(queue).to(exchange).with("lazy.direct.key"); } }
2.消费者接收消息
@Slf4j @Component public class RabbitMQListener implements Serializable { private static final long serialVersionUID = 9128812682227124956L; @RabbitListener(queues = "lazy.direct.queue") public void listenLazyQueueMessage(String msg) { log.info("MQ发布订阅lazy direct 消费者收到消息:{}", msg); } }
3.生产者发送消息
@Slf4j @SpringBootTest @RunWith(SpringRunner.class) public class ProducerTest implements Serializable { private static final long serialVersionUID = -3086376022023672067L; @Autowired private RabbitTemplate rabbitTemplate; @Test public void sendDirectMessage5() { long begin = System.currentTimeMillis(); for (int index = 0; index < 100000; index++){ CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString()); correlationData.getFuture().addCallback(result -> { if (result.isAck()){ log.info("消息发送成功,收到ACK"); } else { log.error("收到NACK,消息发送失败,失败原因:{}", result.getReason()); } }, e -> log.error("消息回调失败", e)); rabbitTemplate.convertAndSend("lazy.direct.exchange", "lazy.direct.key", "direct message", correlationData); } log.info("sendDirectMessage5 end {}", System.currentTimeMillis() - begin); } }
消息幂等处理
通过上述说明,我们保证了消息至少会被消费者消费一次,但是却不能保证消息被多次消费带来的问题。日常开发中,我们不仅要保证消息不丢失,还需要保证消息被重复消费而生成的数据唯一性,即我们常说的幂等
,用函数表达式描述如下:f(x)=f(f(X))
具体实现请参考文章什么是消息队列?消息队列有什么用,这里就不再赘述了
- 本文标签: MQ
- 本文链接: https://www.58cto.cn/article/41
- 版权声明: 本文由程序言原创发布, 非商业性可自由转载、引用,但需署名作者且注明文章出处:程序言 》 RabbitMQ之消息丢失和重复消费解决方案 - https://www.58cto.cn/article/41