新聞中心
前幾天領(lǐng)導(dǎo)突然宣布幾年前停用的電商項(xiàng)目又重新啟動(dòng)了,帶著復(fù)雜的心情仔細(xì)賞閱“兒時(shí)”的代碼,心中的酸楚只有自己能夠體會(huì)。

網(wǎng)站建設(shè)哪家好,找創(chuàng)新互聯(lián)公司!專(zhuān)注于網(wǎng)頁(yè)設(shè)計(jì)、網(wǎng)站建設(shè)、微信開(kāi)發(fā)、重慶小程序開(kāi)發(fā)、集團(tuán)企業(yè)網(wǎng)站建設(shè)等服務(wù)項(xiàng)目。為回饋新老客戶(hù)創(chuàng)新互聯(lián)還提供了中方免費(fèi)建站歡迎大家使用!
這不,昨天又被領(lǐng)導(dǎo)叫進(jìn)了“小黑屋”,讓我把代碼重構(gòu)下進(jìn)行升級(jí)??吹竭@么“可愛(ài)”的代碼,心中一萬(wàn)只“xx馬”疾馳而過(guò)。
讓我最深?lèi)和从X(jué)的就是里邊竟然用定時(shí)任務(wù)實(shí)現(xiàn)了“關(guān)閉超時(shí)訂單”的功能,現(xiàn)在想來(lái),哭笑不得。我們先分析一波為什么大家都在抵制用定時(shí)任務(wù)來(lái)實(shí)現(xiàn)該功能。
定時(shí)任務(wù)
關(guān)閉超時(shí)訂單是在創(chuàng)建訂單之后的一段時(shí)間內(nèi)未完成支付而關(guān)閉訂單的操作,該功能一般要求每筆訂單的超時(shí)時(shí)間是一致的。
如果我們使用定時(shí)任務(wù)來(lái)進(jìn)行該操作,很難把握定時(shí)任務(wù)輪詢(xún)的時(shí)間間隔:
- 時(shí)間間隔足夠小,在誤差允許的范圍內(nèi)可以達(dá)到我們說(shuō)的時(shí)間一致性問(wèn)題,但是頻繁掃描數(shù)據(jù)庫(kù),執(zhí)行定時(shí)任務(wù),會(huì)造成網(wǎng)絡(luò)IO和磁盤(pán)IO的消耗,對(duì)實(shí)時(shí)交易造成一定的沖擊;
- 時(shí)間間隔比較大,由于每個(gè)訂單創(chuàng)建的時(shí)間不一致,所以上邊的一致性要求很難達(dá)到,舉例如下:
假設(shè)30分鐘訂單超時(shí)自動(dòng)關(guān)閉,定時(shí)任務(wù)的執(zhí)行間隔時(shí)間為30分鐘:
我們?cè)诘?分鐘進(jìn)行下單操作;
- 當(dāng)時(shí)間來(lái)到第30分鐘時(shí),定時(shí)任務(wù)執(zhí)行一次,但是我們的訂單未滿(mǎn)足條件,不執(zhí)行;
- 當(dāng)時(shí)間來(lái)到第35分鐘時(shí),訂單達(dá)到關(guān)閉條件,但是定時(shí)任務(wù)未執(zhí)行,所以不執(zhí)行;
- 當(dāng)時(shí)間來(lái)到第60分鐘時(shí),開(kāi)始執(zhí)行我們的訂單關(guān)閉操作,而此時(shí),誤差達(dá)到25分鐘;
- 經(jīng)此種種,我們需要舍棄該方式。
延時(shí)隊(duì)列
為了滿(mǎn)足領(lǐng)導(dǎo)的需求,我便將手伸向了消息隊(duì)列:RabbitMQ。盡管它本身并沒(méi)有提供延時(shí)隊(duì)列的功能,但是我們可以利用它的存活時(shí)間和死信交換機(jī)的特性來(lái)間接實(shí)現(xiàn)。
首先我們先來(lái)簡(jiǎn)單介紹下什么是存活時(shí)間?什么是死信交換機(jī)?
存活時(shí)間
存活時(shí)間的全拼是Time To Live,簡(jiǎn)稱(chēng) TTL。它既支持對(duì)消息本身進(jìn)行設(shè)置(延遲隊(duì)列的關(guān)鍵),又支持對(duì)隊(duì)列進(jìn)行設(shè)置(該隊(duì)列中所有消息存在相同的過(guò)期時(shí)間)。
- 對(duì)消息本身進(jìn)行設(shè)置:即使消息過(guò)期,也不會(huì)馬上從隊(duì)列中抹去,因?yàn)槊織l消息是否過(guò)期是在即將投遞到消費(fèi)者之前判定的;
- 對(duì)隊(duì)列進(jìn)行設(shè)置:一旦消息過(guò)期,就會(huì)從隊(duì)列中抹去。
如果同時(shí)使用這兩種方法,那么以過(guò)期時(shí)間小的那個(gè)數(shù)值為準(zhǔn)。當(dāng)消息達(dá)到過(guò)期時(shí)間還沒(méi)有被消費(fèi),那么該消息就“死了”,我們把它稱(chēng)為 死信 消息。
消息變?yōu)樗佬诺臈l件:
- 消息被拒絕(basic.reject/basic.nack),并且requeue=false;
- 消息的過(guò)期時(shí)間到期了;
- 隊(duì)列達(dá)到最大長(zhǎng)度。
隊(duì)列設(shè)置注意事項(xiàng)
隊(duì)列中該屬性的設(shè)置要在第一次聲明隊(duì)列的時(shí)候設(shè)置才有效,如果隊(duì)列一開(kāi)始已存在且沒(méi)有這個(gè)屬性,則要?jiǎng)h掉隊(duì)列再重新聲明才可以。
隊(duì)列的 ttl 只能被設(shè)置為某個(gè)固定的值,一旦設(shè)置后則不能更改,否則會(huì)拋出異常;
死信交換機(jī)
死信交換機(jī)全拼Dead-Letter-Exchange,簡(jiǎn)稱(chēng)DLX。
當(dāng)消息在一個(gè)隊(duì)列中變成死信之后,如果這個(gè)消息所在的隊(duì)列設(shè)置了x-dead-letter-exchange參數(shù),那么它會(huì)被發(fā)送到x-dead-letter-exchange對(duì)應(yīng)值的交換機(jī)上,這個(gè)交換機(jī)就稱(chēng)之為死信交換機(jī),與這個(gè)死信交換器綁定的隊(duì)列就是死信隊(duì)列。
- x-dead-letter-exchange:出現(xiàn)死信之后將死信重新發(fā)送到指定交換機(jī);
- x-dead-letter-routing-key:出現(xiàn)死信之后將死信重新按照指定的routing-key發(fā)送,如果不設(shè)置默認(rèn)使用消息本身的routing-key。
死信隊(duì)列與普通隊(duì)列的區(qū)別就是它的RoutingKey和Exchange需要作為參數(shù),綁定到正常的隊(duì)列上。
實(shí)戰(zhàn)教學(xué)
先來(lái)張圖感受下我們的整體思路:
- 生產(chǎn)者發(fā)送帶有 ttl 的消息放入交換機(jī)路由到延時(shí)隊(duì)列中;
- 在延時(shí)隊(duì)列中綁定死信交換機(jī)與死信轉(zhuǎn)發(fā)的routing-key;
- 等延時(shí)隊(duì)列中的消息達(dá)到延時(shí)時(shí)間之后變成死信轉(zhuǎn)發(fā)到死信交換機(jī)并路由到死信隊(duì)列中;
- 最后供消費(fèi)者消費(fèi)。
配置類(lèi)
@Configuration
public class DelayQueueRabbitConfig {
public static final String DLX_QUEUE = "queue.dlx";//死信隊(duì)列
public static final String DLX_EXCHANGE = "exchange.dlx";//死信交換機(jī)
public static final String DLX_ROUTING_KEY = "routingkey.dlx";//死信隊(duì)列與死信交換機(jī)綁定的routing-key
public static final String ORDER_QUEUE = "queue.order";//訂單的延時(shí)隊(duì)列
public static final String ORDER_EXCHANGE = "exchange.order";//訂單交換機(jī)
public static final String ORDER_ROUTING_KEY = "routingkey.order";//延時(shí)隊(duì)列與訂單交換機(jī)綁定的routing-key
/**
* 定義死信隊(duì)列
**/
@Bean
public Queue dlxQueue(){
return new Queue(DLX_QUEUE,true);
}
/**
* 定義死信交換機(jī)
**/
@Bean
public DirectExchange dlxExchange(){
return new DirectExchange(DLX_EXCHANGE, true, false);
}
/**
* 死信隊(duì)列和死信交換機(jī)綁定
* 設(shè)置路由鍵:routingkey.dlx
**/
@Bean
Binding bindingDLX(){
return BindingBuilder.bind(dlxQueue()).to(dlxExchange()).with(DLX_ROUTING_KEY);
}
/**
* 訂單延時(shí)隊(duì)列
* 設(shè)置隊(duì)列里的死信轉(zhuǎn)發(fā)到的DLX名稱(chēng)
* 設(shè)置死信在轉(zhuǎn)發(fā)時(shí)攜帶的 routing-key 名稱(chēng)
**/
@Bean
public Queue orderQueue() {
Mapparams = new HashMap<>();
params.put("x-dead-letter-exchange", DLX_EXCHANGE);
params.put("x-dead-letter-routing-key", DLX_ROUTING_KEY);
return new Queue(ORDER_QUEUE, true, false, false, params);
}
/**
* 訂單交換機(jī)
**/
@Bean
public DirectExchange orderExchange() {
return new DirectExchange(ORDER_EXCHANGE, true, false);
}
/**
* 把訂單隊(duì)列和訂單交換機(jī)綁定在一起
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue()).to(orderExchange()).with(ORDER_ROUTING_KEY);
}
}
消費(fèi)消息
@Component
@RabbitListener(queues = DelayQueueRabbitConfig.DLX_QUEUE)//監(jiān)聽(tīng)隊(duì)列名稱(chēng)
public class OrderMQReciever {
@RabbitHandler
public void process(String message){
System.out.println("OrderMQReciever接收到的消息是:"+ message);
}
}
測(cè)試
通過(guò)調(diào)用接口,發(fā)現(xiàn)10秒之后才會(huì)消費(fèi)消息:
問(wèn)題升級(jí)
由于開(kāi)發(fā)環(huán)境和測(cè)試環(huán)境使用的是同一個(gè)交換機(jī)和隊(duì)列,所以發(fā)送的延時(shí)時(shí)間都是30分鐘。但是為了在測(cè)試環(huán)境讓測(cè)試同學(xué)方便測(cè)試,故手動(dòng)將測(cè)試環(huán)境的時(shí)間改為了1分鐘。
問(wèn)題復(fù)現(xiàn)
接著問(wèn)題就來(lái)了:延時(shí)時(shí)間為1分鐘的消息并沒(méi)有立即被消費(fèi),而是等30分鐘的消息被消費(fèi)完之后才被消費(fèi)了。至于原因,我們下邊再分析,先用代碼來(lái)給大家復(fù)現(xiàn)下該問(wèn)題。
@GetMapping("/sendManyMessage")
public String sendManyMessage(){
send("延遲消息睡10秒",10000+"");
send("延遲消息睡2秒",2000+"");
send("延遲消息睡5秒",5000+"");
return "ok";
}
private void send(String msg, String delayTime){
rabbitTemplate.convertAndSend(DelayQueueRabbitConfig.ORDER_EXCHANGE,
DelayQueueRabbitConfig.ORDER_ROUTING_KEY,
msg,message->{
message.getMessageProperties().setExpiration(delayTime);
return message;
});
}執(zhí)行結(jié)果如下:
OrderMQReciever接收到的消息是:延遲消息睡10秒
OrderMQReciever接收到的消息是:延遲消息睡2秒
OrderMQReciever接收到的消息是:延遲消息睡5秒
原因就是延時(shí)隊(duì)列也滿(mǎn)足隊(duì)列先進(jìn)先出的特征,當(dāng)10秒的消息未出隊(duì)列時(shí),后邊的消息不能順利出隊(duì),造成后邊的消息阻塞了,未能達(dá)到精準(zhǔn)延時(shí)。
問(wèn)題解決
我們可以利用x-delay-message插件來(lái)解決該問(wèn)題。
消息的延遲范圍是 Delay > 0, Delay =< ?ERL_MAX_T(在 Erlang 中可以被設(shè)置的范圍為 (2^32)-1 毫秒)。
- 生產(chǎn)者發(fā)送消息到交換機(jī)時(shí),并不會(huì)立即進(jìn)入,而是先將消息持久化到 Mnesia(一個(gè)分布式數(shù)據(jù)庫(kù)管理系統(tǒng));
- 插件將會(huì)嘗試確認(rèn)消息是否過(guò)期;
- 如果消息過(guò)期,消息會(huì)通過(guò) x-delayed-type 類(lèi)型標(biāo)記的交換機(jī)投遞至目標(biāo)隊(duì)列,供消費(fèi)者消費(fèi)。
實(shí)踐
官網(wǎng)下載:https://www.rabbitmq.com/community-plugins.html
我這邊使用的是v3.8.0.ez,將文件下載下來(lái)放到服務(wù)器的/usr/local/soft/rabbitmq_server-3.7.14/plugins 路徑下,執(zhí)行rabbitmq-plugins enable rabbitmq_delayed_message_exchange命令即可。
出現(xiàn)如圖所示,代表安裝成功。
配置類(lèi)
@Configuration
public class XDelayedMessageConfig {
public static final String DIRECT_QUEUE = "queue.direct";//隊(duì)列
public static final String DELAYED_EXCHANGE = "exchange.delayed";//延遲交換機(jī)
public static final String ROUTING_KEY = "routingkey.bind";//綁定的routing-key
/**
* 定義隊(duì)列
**/
@Bean
public Queue directQueue(){
return new Queue(DIRECT_QUEUE,true);
}
/**
* 定義延遲交換機(jī)
* args:根據(jù)該參數(shù)進(jìn)行靈活路由,設(shè)置為“direct”,意味著該插件具有與直連交換機(jī)具有相同的路由行為,
* 如果想要不同的路由行為,可以更換現(xiàn)有的交換類(lèi)型如:“topic”
* 交換機(jī)類(lèi)型為 x-delayed-message
**/
@Bean
public CustomExchange delayedExchange(){
Mapargs = new HashMap ();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 隊(duì)列和延遲交換機(jī)綁定
**/
@Bean
public Binding orderBinding() {
return BindingBuilder.bind(directQueue()).to(delayedExchange()).with(ROUTING_KEY).noargs();
}
}
發(fā)送消息
@RestController
@RequestMapping("/delayed")
public class DelayedSendMessageController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendManyMessage")
public String sendManyMessage(){
send("延遲消息睡10秒",10000);
send("延遲消息睡2秒",2000);
send("延遲消息睡5秒",5000);
return "ok";
}
private void send(String msg, Integer delayTime){
//將消息攜帶路由鍵值
rabbitTemplate.convertAndSend(
XDelayedMessageConfig.DELAYED_EXCHANGE,
XDelayedMessageConfig.ROUTING_KEY,
msg,
message->{
message.getMessageProperties().setDelay(delayTime);
return message;
});
}
}
消費(fèi)消息
@Component
@RabbitListener(queues = XDelayedMessageConfig.DIRECT_QUEUE)//監(jiān)聽(tīng)隊(duì)列名稱(chēng)
public class DelayedMQReciever {
@RabbitHandler
public void process(String message){
System.out.println("DelayedMQReciever接收到的消息是:"+ message);
}
}
測(cè)試
DelayedMQReciever接收到的消息是:延遲消息睡2秒
DelayedMQReciever接收到的消息是:延遲消息睡5秒
DelayedMQReciever接收到的消息是:延遲消息睡10秒
這樣我們的問(wèn)題就順利解決了。
局限性
延遲的消息存儲(chǔ)在一個(gè)Mnesia表中,當(dāng)前節(jié)點(diǎn)上只有一個(gè)磁盤(pán)副本,它們將在節(jié)點(diǎn)重啟后存活。
雖然觸發(fā)計(jì)劃交付的計(jì)時(shí)器不會(huì)持久化,但它將在節(jié)點(diǎn)啟動(dòng)時(shí)的插件激活期間重新初始化。顯然,集群中只有一個(gè)預(yù)定消息的副本意味著丟失該節(jié)點(diǎn)或禁用其上的插件將丟失駐留在該節(jié)點(diǎn)上的消息。
該插件的當(dāng)前設(shè)計(jì)并不適合延遲消息數(shù)量較多的場(chǎng)景(如數(shù)萬(wàn)條或數(shù)百萬(wàn)條),另外該插件的一個(gè)可變性來(lái)源是依賴(lài)于 Erlang 計(jì)時(shí)器,在系統(tǒng)中使用了一定數(shù)量的長(zhǎng)時(shí)間計(jì)時(shí)器之后,它們開(kāi)始爭(zhēng)用調(diào)度程序資源,并且時(shí)間漂移不斷累積。
當(dāng)前題目:領(lǐng)導(dǎo)看了我寫(xiě)的關(guān)閉超時(shí)訂單,讓我出門(mén)左轉(zhuǎn)!
文章路徑:http://m.fisionsoft.com.cn/article/cdoocph.html


咨詢(xún)
建站咨詢(xún)
