新聞中心
紅色的消息隊列——重新尋回失去的機會

創(chuàng)新互聯(lián)主營霍城網(wǎng)站建設(shè)的網(wǎng)絡(luò)公司,主營網(wǎng)站建設(shè)方案,APP應(yīng)用開發(fā),霍城h5重慶小程序開發(fā)搭建,霍城網(wǎng)站營銷推廣歡迎霍城等地區(qū)企業(yè)咨詢
隨著互聯(lián)網(wǎng)的快速發(fā)展,消息隊列作為一種重要的網(wǎng)絡(luò)通信手段,已經(jīng)成為了現(xiàn)今很多應(yīng)用系統(tǒng)中不可或缺的一部分。然而,在消息隊列中,由于網(wǎng)絡(luò)不穩(wěn)定等原因,有時也可能會出現(xiàn)消息的丟失,這種情況給應(yīng)用系統(tǒng)帶來的后果十分嚴重。我們需要一種能夠自我檢測、自我修復(fù)的消息隊列來減少這種“失靈”的概率,那么,紅色的消息隊列——可靠隊列就應(yīng)運而生了。
先來看一下在傳統(tǒng)消息隊列中,當我們啟動消費者客戶端進行消費時,遇到處理異常或者服務(wù)宕機等情況,就有可能丟失部分消息,這種丟失不僅會對應(yīng)用系統(tǒng)造成巨大的危害,更是無法檢測和追溯,一旦發(fā)生就是不可挽回的損失。
可靠隊列的出現(xiàn),解決了這一難題。在可靠隊列中,消息的消費不再是一次性的,而是分為三個階段,分別是:預(yù)處理(PREPARE)、提交(COMMIT)和回滾(ROLLBACK),其中預(yù)處理和提交都保證消息的可靠性,而回滾則是提供了一個修復(fù)機制。
下面,我們來簡單介紹一下可靠隊列的核心原理:
1、消息預(yù)處理
在預(yù)處理階段,消費者會首先請求消息隊列中的一批消息(一般是10條以上),并將這些消息標記為“已占用”。同時,應(yīng)用系統(tǒng)會將這些消息的 ID 號批量保存到可靠隊列上,這里我們可以使用 Redis 來實現(xiàn) ID 存儲和數(shù)據(jù)查詢的功能。預(yù)處理階段結(jié)束后,消費者進行批量處理,將處理成功的消息標記為已處理,而處理失敗的消息則會被重寫入隊列中。
2、消息提交
在消息處理成功后,消費者將進行消息的提交,并移除進入預(yù)處理隊列中的消息 ID。如果消費者無法成功提交消息,則這些消息的狀態(tài)依舊是“已占用”,后續(xù)其他消費者可以看作“消息未完成”,進行重復(fù)消費,直到成功提交為止。
3、消息回滾
在消息消費者的處理失敗或者異常退出時,這些消息仍舊會被留在可靠隊列中,而不會被消除。然后,系統(tǒng)會自動將這些消息狀態(tài)進行回滾,并放回消息隊列中供其他消費者消費。
除了以上三種核心原理之外,可靠隊列還有其他比較重要的功能,如:消息去重、消息重復(fù)消費、消息延時消費等,這些功能的實現(xiàn)也是提高消息隊列可靠性的重要手段。
下面,我們就來實際編寫可靠隊列中幾個重要的功能點:
1、消息去重
在消費者端進行去重是非常必要的,因為在并發(fā)情況下,可能會有多個消費者同時在處理同一消息,如果不做去重處理,則會造成消息的重復(fù)消費,增大系統(tǒng)負擔,因此,我們需要利用 Redis 數(shù)據(jù)庫進行消息 ID 去重的處理,請參考以下代碼片段:
“`python
import redis
class ReliableQueue(object):
def __init__(self):
self.rd = redis.Redis(host=’localhost’, port=6379)
#去重
def remove_duplication(self, message_id):
return self.rd.sadd(‘message_id_set’, message_id)
2、消息重復(fù)消費
在可靠隊列中,消息的消費者不只一個,那么當一些消息處理失敗或者出現(xiàn)異常的時候,需要其他消費者對這些故障消息進行重復(fù)消費,我們也需要在消費者端進行對應(yīng)的設(shè)置,請參考以下代碼片段:
```python
import time
import redis
class ReliableQueue(object):
def __init__(self):
self.rd = redis.Redis(host='localhost', port=6379)
#消息重復(fù)消費
def repeat_message(self, message_id, retry_time):
count = 0
while count
if self.rd.sismember('message_id_set', message_id):
self.rd.srem('message_id_set', message_id)
break
else:
count += 1
time.sleep(1)
3、消息延時消費
在實際業(yè)務(wù)場景中,可能會有某些任務(wù)需要延時執(zhí)行,以達到更好的效果。針對這種需求,我們可以在消費者端設(shè)置帶有過期時間的 Redis 鍵,具體代碼實現(xiàn)如下:
“`python
import redis
class ReliableQueue(object):
def __init__(self):
self.rd = redis.Redis(host=’localhost’, port=6379)
#消息延時
def message_delay(self, msg, delay_time):
self.rd.set(msg[‘message_id’], msg)
self.rd.expire(msg[‘message_id’], delay_time)
綜上所述,可靠隊列作為一種高可用、高并發(fā)的消息中間件,為應(yīng)用系統(tǒng)提供了可靠的消息傳輸機制。在不斷發(fā)展的過程中,我們也需要不斷優(yōu)化和完善它的功能,以應(yīng)對日益復(fù)雜的應(yīng)用場景。
創(chuàng)新互聯(lián)服務(wù)器托管擁有成都T3+級標準機房資源,具備完善的安防設(shè)施、三線及BGP網(wǎng)絡(luò)接入帶寬達10T,機柜接入千兆交換機,能夠有效保證服務(wù)器托管業(yè)務(wù)安全、可靠、穩(wěn)定、高效運行;創(chuàng)新互聯(lián)專注于成都服務(wù)器托管租用十余年,得到成都等地區(qū)行業(yè)客戶的一致認可。
當前題目:紅色的消息隊列重新尋回失去的機會(redis消息隊列撤銷)
網(wǎng)站路徑:http://m.fisionsoft.com.cn/article/djpsiii.html


咨詢
建站咨詢
