新聞中心
Redis Stream
Redis Stream 是 Redis 5.0 版本新增加的數(shù)據(jù)結(jié)構(gòu)。

創(chuàng)新互聯(lián)-專(zhuān)業(yè)網(wǎng)站定制、快速模板網(wǎng)站建設(shè)、高性?xún)r(jià)比興文網(wǎng)站開(kāi)發(fā)、企業(yè)建站全套包干低至880元,成熟完善的模板庫(kù),直接使用。一站式興文網(wǎng)站制作公司更省心,省錢(qián),快速模板網(wǎng)站建設(shè)找我們,業(yè)務(wù)覆蓋興文地區(qū)。費(fèi)用合理售后完善,十載實(shí)體公司更值得信賴(lài)。
Redis Stream 主要用于消息隊(duì)列(MQ,Message Queue),Redis 本身是有一個(gè) Redis 發(fā)布訂閱 (pub/sub) 來(lái)實(shí)現(xiàn)消息隊(duì)列的功能,但它有個(gè)缺點(diǎn)就是消息無(wú)法持久化,如果出現(xiàn)網(wǎng)絡(luò)斷開(kāi)、Redis 宕機(jī)等,消息就會(huì)被丟棄。
簡(jiǎn)單來(lái)說(shuō)發(fā)布訂閱 (pub/sub) 可以分發(fā)消息,但無(wú)法記錄歷史消息。
而 Redis Stream 提供了消息的持久化和主備復(fù)制功能,可以讓任何客戶(hù)端訪(fǎng)問(wèn)任何時(shí)刻的數(shù)據(jù),并且能記住每一個(gè)客戶(hù)端的訪(fǎng)問(wèn)位置,還能保證消息不丟失。
Redis Stream 的結(jié)構(gòu)如下所示,它有一個(gè)消息鏈表,將所有加入的消息都串起來(lái),每個(gè)消息都有一個(gè)唯一的 ID 和對(duì)應(yīng)的內(nèi)容:
每個(gè) Stream 都有唯一的名稱(chēng),它就是 Redis 的 key,在我們首次使用 xadd 指令追加消息時(shí)自動(dòng)創(chuàng)建。
上圖解析:
- Consumer Group :消費(fèi)組,使用 XGROUP CREATE 命令創(chuàng)建,一個(gè)消費(fèi)組有多個(gè)消費(fèi)者(Consumer)。
- last_delivered_id :游標(biāo),每個(gè)消費(fèi)組會(huì)有個(gè)游標(biāo) last_delivered_id,任意一個(gè)消費(fèi)者讀取了消息都會(huì)使游標(biāo) last_delivered_id 往前移動(dòng)。
- pending_ids :消費(fèi)者(Consumer)的狀態(tài)變量,作用是維護(hù)消費(fèi)者的未確認(rèn)的 id。 pending_ids 記錄了當(dāng)前已經(jīng)被客戶(hù)端讀取的消息,但是還沒(méi)有 ack (Acknowledge character:確認(rèn)字符)。
消息隊(duì)列相關(guān)命令:
- XADD - 添加消息到末尾
- XTRIM - 對(duì)流進(jìn)行修剪,限制長(zhǎng)度
- XDEL - 刪除消息
- XLEN - 獲取流包含的元素?cái)?shù)量,即消息長(zhǎng)度
- XRANGE - 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息
- XREVRANGE - 反向獲取消息列表,ID 從大到小
- XREAD - 以阻塞或非阻塞方式獲取消息列表
消費(fèi)者組相關(guān)命令:
- XGROUP CREATE - 創(chuàng)建消費(fèi)者組
- XREADGROUP GROUP - 讀取消費(fèi)者組中的消息
- XACK - 將消息標(biāo)記為"已處理"
- XGROUP SETID - 為消費(fèi)者組設(shè)置新的最后遞送消息ID
- XGROUP DELCONSUMER - 刪除消費(fèi)者
- XGROUP DESTROY - 刪除消費(fèi)者組
- XPENDING - 顯示待處理消息的相關(guān)信息
- XCLAIM - 轉(zhuǎn)移消息的歸屬權(quán)
- XINFO - 查看流和消費(fèi)者組的相關(guān)信息;
- XINFO GROUPS - 打印消費(fèi)者組的信息;
- XINFO STREAM - 打印流信息
XADD
使用 XADD 向隊(duì)列添加消息,如果指定的隊(duì)列不存在,則創(chuàng)建一個(gè)隊(duì)列,XADD 語(yǔ)法格式:
XADD key ID field value [field value ...]
- key :隊(duì)列名稱(chēng),如果不存在就創(chuàng)建
- ID :消息 id,我們使用 * 表示由 redis 生成,可以自定義,但是要自己保證遞增性。
- field value : 記錄。
實(shí)例
redis
> XADD mystream
* name Sara surname OConnor
"1601372323627-0"
redis
> XADD mystream
* field1 value1 field2 value2 field3 value3
"1601372323627-1"
redis
> XLEN mystream
(integer
)
2
redis
> XRANGE mystream - +
1
)
1
)
"1601372323627-0"
2
)
1
)
"name"
2
)
"Sara"
3
)
"surname"
4
)
"OConnor"
2
)
1
)
"1601372323627-1"
2
)
1
)
"field1"
2
)
"value1"
3
)
"field2"
4
)
"value2"
5
)
"field3"
6
)
"value3"
redis
>
XTRIM
使用 XTRIM 對(duì)流進(jìn)行修剪,限制長(zhǎng)度, 語(yǔ)法格式:
XTRIM key MAXLEN [~] count
- key :隊(duì)列名稱(chēng)
- MAXLEN :長(zhǎng)度
- count :數(shù)量
實(shí)例
127.0.0.1:
6379
> XADD mystream
* field1 A field2 B field3 C field4 D
"1601372434568-0"
127.0.0.1:
6379
> XTRIM mystream MAXLEN
2
(integer
)
0
127.0.0.1:
6379
> XRANGE mystream - +
1
)
1
)
"1601372434568-0"
2
)
1
)
"field1"
2
)
"A"
3
)
"field2"
4
)
"B"
5
)
"field3"
6
)
"C"
7
)
"field4"
8
)
"D"
127.0.0.1:
6379
>
redis
>
XDEL
使用 XDEL 刪除消息,語(yǔ)法格式:
XDEL key ID [ID ...]
- key:隊(duì)列名稱(chēng)
- ID :消息 ID
實(shí)例
> XADD mystream
* a
1
1538561698944-
0
> XADD mystream
* b
2
1538561700640-
0
> XADD mystream
* c
3
1538561701744-
0
> XDEL mystream
1538561700640-
0
(integer
)
1
127.0.0.1:
6379
> XRANGE mystream - +
1
)
1
)
1538561698944-
0
2
)
1
)
"a"
2
)
"1"
2
)
1
)
1538561701744-
0
2
)
1
)
"c"
2
)
"3"
XLEN
使用 XLEN 獲取流包含的元素?cái)?shù)量,即消息長(zhǎng)度,語(yǔ)法格式:
XLEN key
- key:隊(duì)列名稱(chēng)
實(shí)例
redis
> XADD mystream
* item
1
"1601372563177-0"
redis
> XADD mystream
* item
2
"1601372563178-0"
redis
> XADD mystream
* item
3
"1601372563178-1"
redis
> XLEN mystream
(integer
)
3
redis
>
XRANGE
使用 XRANGE 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息 ,語(yǔ)法格式:
XRANGE key start end [COUNT count]
- key :隊(duì)列名
- start :開(kāi)始值, - 表示最小值
- end :結(jié)束值, + 表示最大值
- count :數(shù)量
實(shí)例
redis
> XADD writers
* name Virginia surname Woolf
"1601372577811-0"
redis
> XADD writers
* name Jane surname Austen
"1601372577811-1"
redis
> XADD writers
* name Toni surname Morrison
"1601372577811-2"
redis
> XADD writers
* name Agatha surname Christie
"1601372577812-0"
redis
> XADD writers
* name Ngozi surname Adichie
"1601372577812-1"
redis
> XLEN writers
(integer
)
5
redis
> XRANGE writers - + COUNT
2
1
)
1
)
"1601372577811-0"
2
)
1
)
"name"
2
)
"Virginia"
3
)
"surname"
4
)
"Woolf"
2
)
1
)
"1601372577811-1"
2
)
1
)
"name"
2
)
"Jane"
3
)
"surname"
4
)
"Austen"
redis
>
XREVRANGE
使用 XREVRANGE 獲取消息列表,會(huì)自動(dòng)過(guò)濾已經(jīng)刪除的消息 ,語(yǔ)法格式:
XREVRANGE key end start [COUNT count]
- key :隊(duì)列名
- end :結(jié)束值, + 表示最大值
- start :開(kāi)始值, - 表示最小值
- count :數(shù)量
實(shí)例
redis
> XADD writers
* name Virginia surname Woolf
"1601372731458-0"
redis
> XADD writers
* name Jane surname Austen
"1601372731459-0"
redis
> XADD writers
* name Toni surname Morrison
"1601372731459-1"
redis
> XADD writers
* name Agatha surname Christie
"1601372731459-2"
redis
> XADD writers
* name Ngozi surname Adichie
"1601372731459-3"
redis
> XLEN writers
(integer
)
5
redis
> XREVRANGE writers + - COUNT
1
1
)
1
)
"1601372731459-3"
2
)
1
)
"name"
2
)
"Ngozi"
3
)
"surname"
4
)
"Adichie"
redis
>
XREAD
使用 XREAD 以阻塞或非阻塞方式獲取消息列表 ,語(yǔ)法格式:
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...]
- count :數(shù)量
- milliseconds :可選,阻塞毫秒數(shù),沒(méi)有設(shè)置就是非阻塞模式
- key :隊(duì)列名
- id :消息 ID
實(shí)例
# 從 Stream 頭部讀取兩條消息
> XREAD COUNT
2 STREAMS mystream writers
0-
0
0-
0
1
)
1
)
"mystream"
2
)
1
)
1
)
1526984818136-
0
2
)
1
)
"duration"
2
)
"1532"
3
)
"event-id"
4
)
"5"
5
)
"user-id"
6
)
"7782813"
2
)
1
)
1526999352406-
0
2
)
1
)
"duration"
2
)
"812"
3
)
"event-id"
4
)
"9"
5
)
"user-id"
6
)
"388234"
2
)
1
)
"writers"
2
)
1
)
1
)
1526985676425-
0
2
)
1
)
"name"
2
)
"Virginia"
3
)
"surname"
4
)
"Woolf"
2
)
1
)
1526985685298-
0
2
)
1
)
"name"
2
)
"Jane"
3
)
"surname"
4
)
"Austen"
XGROUP CREATE
使用 XGROUP CREATE 創(chuàng)建消費(fèi)者組,語(yǔ)法格式:
XGROUP [CREATE key groupname id-or-$] [SETID key groupname id-or-$] [DESTROY key groupname] [DELCONSUMER key groupname consumername]
- key :隊(duì)列名稱(chēng),如果不存在就創(chuàng)建
- groupname :組名。
- $ : 表示從尾部開(kāi)始消費(fèi),只接受新消息,當(dāng)前 Stream 消息會(huì)全部忽略。
從頭開(kāi)始消費(fèi):
XGROUP CREATE mystream consumer-group-name 0-0
從尾部開(kāi)始消費(fèi):
XGROUP CREATE mystream consumer-group-name $
XREADGROUP GROUP
使用 XREADGROUP GROUP 讀取消費(fèi)組中的消息,語(yǔ)法格式:
XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] ID [ID ...]
- group :消費(fèi)組名
- consumer :消費(fèi)者名。
- count : 讀取數(shù)量。
- milliseconds : 阻塞毫秒數(shù)。
- key : 隊(duì)列名。
- ID : 消息 ID。
XREADGROUP GROUP consumer-group-name consumer-name COUNT 1 STREAMS mystream >
網(wǎng)站標(biāo)題:創(chuàng)新互聯(lián)Redis教程:RedisStream
標(biāo)題網(wǎng)址:http://m.fisionsoft.com.cn/article/cdohdhh.html


咨詢(xún)
建站咨詢(xún)
