一. 前言
最近有很多小伙伴開始找工作,在面試時,面試官經(jīng)常問到一個題目:RabbitMQ如何防止重復(fù)消費?
有很多小伙伴這個時候都在想,消息怎么就會重復(fù)消費呢???.......
所以他們在面試后就跑來問小編,針對這個比較高頻的題目,小編就在這里為大家來講講MQ防止重復(fù)消費的實現(xiàn)方案吧。
二. 面試題考點
如果面試官是小編的話,那么我想考察的,其實就是候選人除了對技術(shù)的基本使用之外,再就是在各種實際應(yīng)用場景中對可能發(fā)生問題的實際處理能力。
所以這道題的考點,最起碼有兩點:
第一是RabbitMQ中消息的重復(fù)消費是如何產(chǎn)生的,我們首先要發(fā)現(xiàn)問題,知道問題產(chǎn)生原因:
第二是針對這個重復(fù)消費問題的處理方案及機制。
三. 解題分析
接下來小編就根據(jù)上述考點,帶大家來一起分析這個問題的解題思路。
3.1RabbitMQ消息重復(fù)消費的產(chǎn)生原因
根據(jù)上圖,給大家梳理總結(jié)出了消息重復(fù)消費的產(chǎn)生過程,如下:
消費方的業(yè)務(wù)項目從MQ隊列中接收數(shù)據(jù);
接著處理業(yè)務(wù);
業(yè)務(wù)處理成功后,消費方項目給MQ返回ack進行手動確認;
返回回調(diào)執(zhí)行結(jié)果的過程中,因為網(wǎng)絡(luò)抖動等原因,回調(diào)數(shù)據(jù)時,MQ沒有返回成功,所以MQ隊列中的數(shù)據(jù)會再次發(fā)給業(yè)務(wù)項目,造成重復(fù)消費。
3.2. RabbitMQ消息重復(fù)消費的處理方案
針對消息的重復(fù)消費問題,根據(jù)上圖總結(jié)的解決思路如下:
監(jiān)聽器接收MQ隊列中的數(shù)據(jù):
利用redis的setnx命令,以消息唯一id為key,以消息內(nèi)容為value,超時時間設(shè)置為10秒,存入redis中;
如果能夠成功存入,說明沒有重復(fù)消費,則處理業(yè)務(wù),處理完業(yè)務(wù)后返回ack或者nack確認;
如果存不進去,則說明重復(fù)消費,直接返回ack確認的回調(diào)信息就可以了。
3.3解決重復(fù)消費的案例代碼
發(fā)送方測試代碼
/**
* 測試發(fā)送
* @author 千鋒
*/
@SpringBootTest(classes = ProducerApplication.class)
@RunWith(SpringRunner.class)
public class TestProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void contextLoads() throws IOException {
//給消息封裝一個唯一id對象
CorrelationData messageId = new CorrelationData(UUID.randomUUID().toString());
//第四個參數(shù): 設(shè)置消息唯一id
rabbitTemplate.convertAndSend("交換器名字","路由鍵","千鋒測試MQ重復(fù)消費處理??!",messageId);
}
}
接收方測試代碼
package com.qf.rabbitmq.topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
/**
* @author 千鋒
*/
@Component
public class Consumer {
@Autowired
private StringRedisTemplate redisTemplate;
@RabbitListener(queues = "隊列名字")
public void getMessage(String msg, Channel channel, Message message) throws IOException {
//0. 獲取MessageId, 消息唯一id
String messageId = (String) message.getMessageProperties().getHeaders().get("spring_returned_message_correlation");
//1. 設(shè)置key到Redis
if(redisTemplate.opsForValue().setIfAbsent(messageId,"0", 10, TimeUnit.SECONDS)) {
//2. 消費消息
System.out.println("接收到消息:" + msg);
//3. 設(shè)置key的value為1
redisTemplate.opsForValue().set(messageId,"1",10,TimeUnit.SECONDS);
//4. 手動ack
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}else {
//5. 獲取Redis中的value即可 如果是1,手動ack
if("1".equalsIgnoreCase(redisTemplate.opsForValue().get(messageId))){
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
}
}
}
}
四. 總結(jié)
經(jīng)過上面的分析,最后健哥再給大家總結(jié)一下這個問題的完整答案。
問題產(chǎn)生原因:
因為消費方和MQ服務(wù)器網(wǎng)絡(luò)閃斷等原因,造成了接收方消費后,返回給MQ服務(wù)器一個ack確認消息,結(jié)果MQ沒有接收到,造成了重復(fù)消費。
解決過程:
利用redis的setnx命令,將消費的消息id存入到redis,超時時間設(shè)置為10秒,然后再給mq返回ack。消費前要判斷redis中是否存在這個消息id,如果不存在說明沒有消費過,則正常消費;如果redis中存在這個消息id,則說明重復(fù)消費,直接返回ack,不重復(fù)執(zhí)行業(yè)務(wù)。
以上就是MQ中消息重復(fù)消費的產(chǎn)生原因及解決思路和對應(yīng)案例,現(xiàn)在你知道該怎么解決了嗎?更多關(guān)于“Java培訓”的問題,歡迎咨詢千鋒教育在線名師。千鋒已有十余年的培訓經(jīng)驗,課程大綱更科學更專業(yè),有針對零基礎(chǔ)的就業(yè)班,有針對想提升技術(shù)的好程序員班,高品質(zhì)課程助力你實現(xiàn)java程序員夢想。