# Stream Commands
# xack
支持:是
说明:将消息标记为已处理。
示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XADD mystream * field1 A "1702262480258-0" > XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702262480258-0" 2) 1) "field1" 2) "A" > XACK mystream mygroup 1702262480258-0 (integer) 1
# xadd
支持:是
说明:用于将指定的流条目添加到指定键的流中。如果键不存在,运行此命令会作为副作用创建一个带有流值的键。可以通过NOMKSTREAM选项禁用创建流的键。
示例:
> XADD mystream * name Sara surname OConnor "1702015234680-0" > XADD mystream * field1 value1 field2 value2 field3 value3 "1702015242307-0" > XLEN mystream (integer) 2 > XRANGE mystream - + 1) 1) "1702015234680-0" 2) 1) "name" 2) "Sara" 3) "surname" 4) "OConnor" 2) 1) "1702015242307-0" 2) 1) "field1" 2) "value1" 3) "field2" 4) "value2" 5) "field3" 6) "value3"
# xautoclaim
支持:是
说明:自动认领已经发送给消费者组中某个消费者但尚未被确认的消息,可以自动找到并认领那些由于某种原因(例如消费者崩溃或处理速度过慢)未能得到及时确认的消息。
示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XGROUP CREATECONSUMER mystream mygroup consumer-1 (integer) 1 > XADD mystream * field1 A "1702264921750-0" > XREADGROUP GROUP mygroup consumer-1 streams mystream > 1) 1) "mystream" 2) 1) 1) "1702264921750-0" 2) 1) "field1" 2) "A" > XAUTOCLAIM mystream mygroup consumer-1 3600 0-0 count 10 1) "0-0" 2) 1) 1) "1702264921750-0" 2) 1) "field1" 2) "A" 3) (empty array)
# xclaim
支持:是
说明:更改已经发送给某个消费者但尚未被该消费者确认(acknowledged)的消息的所有权。
示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XGROUP CREATECONSUMER mystream mygroup consumer-1 (integer) 1 > XADD mystream * field1 A "1702263515783-0" > XREADGROUP GROUP mygroup consumer-1 streams mystream > 1) 1) "mystream" 2) 1) 1) "1702263515783-0" 2) 1) "field1" 2) "A" > XCLAIM mystream mygroup consumer-1 3600 1702263515783-0 1) 1) "1702263515783-0" 2) 1) "field1" 2) "A"
# xdel
支持:是
说明:从流中删除指定的条目,并返回删除的条目数量。这个数量可能小于传递给命令的ID数量,这是因为有些指定的ID在流中不存在。
示例:
> XADD mystream * a 1 "1702016971985-0" > XADD mystream * b 2 "1702016978937-0" > XADD mystream * c 3 "1702016986863-0" > XDEL mystream 1702016978937-0 (integer) 1 > XRANGE mystream - + 1) 1) "1702016971985-0" 2) 1) "a" 2) "1" 2) 1) "1702016986863-0" 2) 1) "c" 2) "3"
# xgroup create
支持:是
说明:为存储在
的流创建一个新的消费者组,该消费者组由 唯一标识,如果流不存在,可以使用MKSTREAM子命令来自动创建长度为0的流。 示例:
> XADD mystream1 * field1 A "1702265924275-0" > XADD mystream1 * field1 B "1702265927324-0" > XGROUP CREATE mystream1 mygroup 0 OK > XGROUP CREATE mystream2 mygroup $ MKSTREAM OK
# xgroup createconsumer
支持:是
说明:为存储在
的流的消费者组 中创建一个名为 的消费者,当一个操作(如XREADGROUP)引用一个不存在的消费者时,消费者也会自动创建。 示例:
> XADD mystream * field1 A "1702267010504-0" > XADD mystream * field1 B "1702267012894-0" > XADD mystream * field1 C "1702267015439-0" > XGROUP CREATE mystream mygroup 0 OK > XGROUP CREATECONSUMER mystream mygroup myconsumer (integer) 1
# xgroup delconsumer
支持:是
说明:从消费者组中删除消费者,消费者拥有的任何挂起消息在删除后都将变得不可声明。
示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XGROUP CREATECONSUMER mystream mygroup myconsumer (integer) 1 > XADD mystream * field1 A "1702276220998-0" > XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702276220998-0" 2) 1) "field1" 2) "A" > XGROUP DELCONSUMER mystream mygroup myconsumer (integer) 1
# xgroup destroy
支持:是
说明:从流中删除消费者组,即使存在活跃的消费者和挂起的消息,消费者组也会被删除。
示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XGROUP CREATECONSUMER mystream mygroup myconsumer (integer) 1 > XADD mystream * field1 A "1702277253475-0" > XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702277253475-0" 2) 1) "field1" 2) "A" > XGROUP DESTROY mystream mygroup (integer) 1
# xgroup setid
支持:是
说明:设置或更新给定消费者组的最后一次发送的ID。
示例:
> XADD mystream * field1 A field2 B field3 C "1702278952699-0" > XGROUP CREATE mystream mygroup $ OK > XGROUP SETID mystream mygroup 0 OK > XGROUP SETID mystream mygroup $ ENTRIESREAD 2 OK
# xinfo consumers
支持:是
说明:返回属于存储在
的流的 消费者组的消费者列表。 示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XGROUP CREATECONSUMER mystream mygroup myconsumer (integer) 1 > XADD mystream * field1 A "1702279322174-0" > XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702279322174-0" 2) 1) "field1" 2) "A" > xinfo consumers mystream mygroup 1) 1) "name" 2) "myconsumer" 3) "pending" 4) (integer) 1 5) "idle" 6) (integer) 24605 7) "inactive" 8) (integer) 24605
# xinfo groups
支持:是
说明:返回存储在
的流的所有消费者组的列表。 示例:
127.0.0.1:9999> XGROUP CREATE mystream mygroup $ MKSTREAM OK 127.0.0.1:9999> XADD mystream * field1 A "1702279644162-0" 127.0.0.1:9999> XADD mystream * field1 B "1702279648383-0" 127.0.0.1:9999> XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702279644162-0" 2) 1) "field1" 2) "A" 2) 1) "1702279648383-0" 2) 1) "field1" 2) "B" 127.0.0.1:9999> xinfo groups mystream 1) 1) "name" 2) "mygroup" 3) "consumers" 4) (integer) 1 5) "pending" 6) (integer) 2 7) "last-delivered-id" 8) "1702279651686-0" 9) "entries-read" 10) (integer) 2 11) "lag" 12) (integer) 0
# xinfo stream
支持:是
说明:返回有关存储在
的流的信息。 示例:
> XADD mystream * field1 A "1702280092986-0" > XADD mystream * field1 B "1702280095549-0" > XADD mystream * field1 C "1702280099334-0" > XGROUP CREATE mystream mygroup $ OK > xinfo stream mystream 1) "length" 2) (integer) 3 3) "radix-tree-keys" 4) (integer) 3 5) "radix-tree-nodes" 6) (integer) 1 7) "last-generated-id" 8) "1702280099334-0" 9) "max-deleted-entry-id" 10) "0-0" 11) "entries-added" 12) (integer) 3 13) "recorded-first-entry-id" 14) "1702280092986-0" 15) "groups" 16) (integer) 1 17) "first-entry" 18) 1) "1702280092986-0" 2) 1) "field1" 2) "A" 19) "last-entry" 20) 1) "1702280099334-0" 2) 1) "field1" 2) "C"
# xlen
支持:是
说明:返回流中的条目数量。如果指定的键不存在,命令将返回零,就像流为空一样。
> XADD mystream * item 1 "1702016706514-0" > XADD mystream * item 2 "1702016714325-0" > XADD mystream * item 3 "1702016721351-0" > XLEN mystream (integer) 3
# xpending
支持:是
说明:获取被消费者组的某个消费者读取,但还没有被确认(即没有收到相应的XACK命令)的信息。
示例:
> XGROUP CREATE mystream mygroup $ MKSTREAM OK > XADD mystream * field1 A "1702280598636-0" > XADD mystream * field1 B "1702280603509-0" > XADD mystream * field1 C "1702280606719-0" > XADD mystream * field1 D "1702280608610-0" > XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702280598636-0" 2) 1) "field1" 2) "A" 2) 1) "1702280603509-0" 2) 1) "field1" 2) "B" 3) 1) "1702280606719-0" 2) 1) "field1" 2) "C" 4) 1) "1702280608610-0" 2) 1) "field1" 2) "D" > xpending mystream mygroup 1) (integer) 4 2) "1702280598636-0" 3) "1702280608610-0" 4) 1) 1) "myconsumer" 2) "4" > xpending mystream mygroup - + 2 1) 1) "1702280598636-0" 2) "myconsumer" 3) (integer) 41639 4) (integer) 1 2) 1) "1702280603509-0" 2) "myconsumer" 3) (integer) 41639 4) (integer) 1
# xrange
支持:是
说明:该命令返回与给定ID范围匹配的流条目。范围由最小和最大ID指定。所有具有在两个指定ID之间(闭区间)或恰好一个指定的ID的条目都将被返回。
示例:
> XADD writers * name Virginia surname Woolf "1702266467947-0" > XADD writers * name Jane surname Austen "1702266474590-0" > XADD writers * name Toni surname Morrison "1702266482195-0" > XADD writers * name Agatha surname Christie "1702266489596-0" > XADD writers * name Ngozi surname Adichie "1702266496750-0" > XLEN writers (integer) 5 > XRANGE writers - + COUNT 2 1) 1) "1702266467947-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1702266474590-0" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
# xread
支持:是
说明:由活跃频道组成的列表。
示例:
> XADD writers * name Virginia surname Woolf "1702019112368-0" > XADD writers * name Jane surname Austen "1702019119903-0" > XADD writers * name Toni surname Morrison "1702019130707-0" > XADD mystream * a 1 "1702019221640-0" > XADD mystream * b 2 "1702019228375-0" > XADD mystream * c 3 "1702019234721-0" > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0 1) 1) "mystream" 2) 1) 1) "1702019221640-0" 2) 1) "a" 2) "1" 2) 1) "1702019228375-0" 2) 1) "b" 2) "2" 2) 1) "writers" 2) 1) 1) "1702019112368-0" 2) 1) "name" 2) "Virginia" 3) "surname" 4) "Woolf" 2) 1) "1702019119903-0" 2) 1) "name" 2) "Jane" 3) "surname" 4) "Austen"
# xreadgroup
支持:是
说明:用于从一个或多个流中读取数据,同时将消息标记为由特定消费者组中的特定消费者处理;可将消息的所有权分配给调用命令的消费者,其他消费者将不会再次看到这些消息,除非它们没有被正确处理并通过XPENDING和XCLAIM被重新认领。
示例:
> XADD mystream * field1 A "1702281360230-0" > XADD mystream * field1 B "1702281363262-0" > XADD mystream * field1 C "1702281366656-0" > XADD mystream * field1 D "1702281370567-0" > XGROUP CREATE mystream mygroup 0 OK > XREADGROUP GROUP mygroup myconsumer streams mystream > 1) 1) "mystream" 2) 1) 1) "1702281360230-0" 2) 1) "field1" 2) "A" 2) 1) "1702281363262-0" 2) 1) "field1" 2) "B" 3) 1) "1702281366656-0" 2) 1) "field1" 2) "C" 4) 1) "1702281370567-0" 2) 1) "field1" 2) "D" > XREADGROUP GROUP mygroup myconsumer COUNT 2 STREAMS mystream 0 1) 1) "mystream" 2) 1) 1) "1702281360230-0" 2) 1) "field1" 2) "A" 2) 1) "1702281363262-0" 2) 1) "field1" 2) "B"
# xrevrange
支持:是
说明:这个命令与XRANGE非常相似,但有一个显著的区别:返回的条目顺序是相反的,同时接受的起始-结束范围也是相反的顺序。在XREVRANGE中,您需要先指定结束ID,然后指定起始ID,该命令将生成两个ID之间的所有元素(或恰好类似于这两个ID的元素),从结束端开始。
示例:
> XADD writers * name Virginia surname Woolf "1702018069631-0" > XADD writers * name Jane surname Austen "1702018075959-0" > XADD writers * name Toni surname Morrison "1702018082836-0" > XADD writers * name Agatha surname Christie "1702018088536-0" > XADD writers * name Ngozi surname Adichie "1702018096270-0" > XLEN writers (integer) 5 > XREVRANGE writers + - COUNT 1 1) 1) "1702018096270-0" 2) 1) "name" 2) "Ngozi" 3) "surname" 4) "Adichie"
# xsetid
支持:是
说明:复制流的最后一次发送的ID。
示例:
> XADD mystream * field1 A "1702282024021-0" > XADD mystream * field1 B "1702282027981-0" > XSETID mystream 1702282027981-0 OK
# xtrim
支持:是
说明:XTRIM通过在需要时驱逐较旧的条目(具有较低ID的条目)来修剪流。
示例:
> XADD mystream * field1 A field2 B field3 C field4 D "1702019508888-0" > XTRIM mystream MAXLEN 2 (integer) 0 > XRANGE mystream - + 1) 1) "1702019508888-0" 2) 1) "field1" 2) "A" 3) "field2" 4) "B" 5) "field3" 6) "C" 7) "field4" 8) "D"
← HyperLoglog命令 持久化备份命令 →