# Stream Commands

# xack

  • 支持:是

  • 说明:将消息标记为已处理。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XADD mystream * field1 A
    "1702262480258-0"
    > XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702262480258-0"
             2) 1) "field1"
                2) "A"
    > XACK mystream mygroup 1702262480258-0
    (integer) 1
    

# xadd

  • 支持:是

  • 说明:用于将指定的流条目添加到指定键的流中。如果键不存在,运行此命令会作为副作用创建一个带有流值的键。可以通过NOMKSTREAM选项禁用创建流的键。

  • 示例:

    > XADD mystream * name Sara surname OConnor
    "1702015234680-0"
    > XADD mystream * field1 value1 field2 value2 field3 value3
    "1702015242307-0"
    > XLEN mystream
    (integer) 2
    > XRANGE mystream - +
    1) 1) "1702015234680-0"
       2) 1) "name"
          2) "Sara"
          3) "surname"
          4) "OConnor"
    2) 1) "1702015242307-0"
       2) 1) "field1"
          2) "value1"
          3) "field2"
          4) "value2"
          5) "field3"
          6) "value3"
    

# xautoclaim

  • 支持:是

  • 说明:自动认领已经发送给消费者组中某个消费者但尚未被确认的消息,可以自动找到并认领那些由于某种原因(例如消费者崩溃或处理速度过慢)未能得到及时确认的消息。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XGROUP CREATECONSUMER  mystream mygroup consumer-1
    (integer) 1
    > XADD mystream * field1 A
    "1702264921750-0"
    > XREADGROUP GROUP mygroup consumer-1 streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702264921750-0"
             2) 1) "field1"
                2) "A"
    > XAUTOCLAIM mystream mygroup consumer-1 3600 0-0 count 10
    1) "0-0"
    2) 1) 1) "1702264921750-0"
          2) 1) "field1"
             2) "A"
    3) (empty array)
    

# xclaim

  • 支持:是

  • 说明:更改已经发送给某个消费者但尚未被该消费者确认(acknowledged)的消息的所有权。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XGROUP CREATECONSUMER  mystream mygroup consumer-1
    (integer) 1
    > XADD mystream * field1 A
    "1702263515783-0"
    > XREADGROUP GROUP mygroup consumer-1 streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702263515783-0"
             2) 1) "field1"
                2) "A"
    > XCLAIM mystream mygroup consumer-1 3600 1702263515783-0
    1) 1) "1702263515783-0"
       2) 1) "field1"
          2) "A"
    

# xdel

  • 支持:是

  • 说明:从流中删除指定的条目,并返回删除的条目数量。这个数量可能小于传递给命令的ID数量,这是因为有些指定的ID在流中不存在。

  • 示例:

    > XADD mystream * a 1
    "1702016971985-0"
    > XADD mystream * b 2
    "1702016978937-0"
    > XADD mystream * c 3
    "1702016986863-0"
    > XDEL mystream 1702016978937-0
    (integer) 1
    > XRANGE mystream - +
    1) 1) "1702016971985-0"
       2) 1) "a"
          2) "1"
    2) 1) "1702016986863-0"
       2) 1) "c"
          2) "3"
    

# xgroup create

  • 支持:是

  • 说明:为存储在的流创建一个新的消费者组,该消费者组由唯一标识,如果流不存在,可以使用MKSTREAM子命令来自动创建长度为0的流。

  • 示例:

    > XADD mystream1 * field1 A
    "1702265924275-0"
    > XADD mystream1 * field1 B
    "1702265927324-0"
    > XGROUP CREATE mystream1 mygroup 0
    OK
    > XGROUP CREATE mystream2 mygroup $ MKSTREAM
    OK
    

# xgroup createconsumer

  • 支持:是

  • 说明:为存储在的流的消费者组中创建一个名为的消费者,当一个操作(如XREADGROUP)引用一个不存在的消费者时,消费者也会自动创建。

  • 示例:

    > XADD mystream * field1 A
    "1702267010504-0"
    > XADD mystream * field1 B
    "1702267012894-0"
    > XADD mystream * field1 C
    "1702267015439-0"
    > XGROUP CREATE mystream mygroup 0
    OK
    > XGROUP CREATECONSUMER mystream mygroup myconsumer
    (integer) 1
    

# xgroup delconsumer

  • 支持:是

  • 说明:从消费者组中删除消费者,消费者拥有的任何挂起消息在删除后都将变得不可声明。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XGROUP CREATECONSUMER mystream mygroup myconsumer
    (integer) 1
    > XADD mystream * field1 A
    "1702276220998-0"
    > XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702276220998-0"
             2) 1) "field1"
                2) "A"
    > XGROUP DELCONSUMER mystream mygroup myconsumer
    (integer) 1
    

# xgroup destroy

  • 支持:是

  • 说明:从流中删除消费者组,即使存在活跃的消费者和挂起的消息,消费者组也会被删除。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XGROUP CREATECONSUMER mystream mygroup myconsumer
    (integer) 1
    > XADD mystream * field1 A
    "1702277253475-0"
    > XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702277253475-0"
             2) 1) "field1"
                2) "A"
    > XGROUP DESTROY mystream mygroup
    (integer) 1
    

# xgroup setid

  • 支持:是

  • 说明:设置或更新给定消费者组的最后一次发送的ID。

  • 示例:

    > XADD mystream * field1 A field2 B field3 C
    "1702278952699-0"
    > XGROUP CREATE mystream mygroup $
    OK
    > XGROUP SETID mystream mygroup 0
    OK
    > XGROUP SETID mystream mygroup $ ENTRIESREAD 2
    OK
    

# xinfo consumers

  • 支持:是

  • 说明:返回属于存储在的流的消费者组的消费者列表。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XGROUP CREATECONSUMER mystream mygroup myconsumer
    (integer) 1
    > XADD mystream * field1 A
    "1702279322174-0"
    > XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702279322174-0"
             2) 1) "field1"
                2) "A"
    > xinfo consumers mystream mygroup
    1) 1) "name"
       2) "myconsumer"
       3) "pending"
       4) (integer) 1
       5) "idle"
       6) (integer) 24605
       7) "inactive"
       8) (integer) 24605
    

# xinfo groups

  • 支持:是

  • 说明:返回存储在的流的所有消费者组的列表。

  • 示例:

    127.0.0.1:9999> XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    127.0.0.1:9999> XADD mystream * field1 A
    "1702279644162-0"
    127.0.0.1:9999> XADD mystream * field1 B
    "1702279648383-0"
    127.0.0.1:9999> XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702279644162-0"
             2) 1) "field1"
                2) "A"
          2) 1) "1702279648383-0"
             2) 1) "field1"
                2) "B"
    127.0.0.1:9999> xinfo groups mystream
    1)  1) "name"
        2) "mygroup"
        3) "consumers"
        4) (integer) 1
        5) "pending"
        6) (integer) 2
        7) "last-delivered-id"
        8) "1702279651686-0"
        9) "entries-read"
       10) (integer) 2
       11) "lag"
       12) (integer) 0
    

# xinfo stream

  • 支持:是

  • 说明:返回有关存储在 的流的信息。

  • 示例:

    > XADD mystream * field1 A
    "1702280092986-0"
    > XADD mystream * field1 B
    "1702280095549-0"
    > XADD mystream * field1 C
    "1702280099334-0"
    > XGROUP CREATE mystream mygroup $
    OK
    > xinfo stream mystream
     1) "length"
     2) (integer) 3
     3) "radix-tree-keys"
     4) (integer) 3
     5) "radix-tree-nodes"
     6) (integer) 1
     7) "last-generated-id"
     8) "1702280099334-0"
     9) "max-deleted-entry-id"
    10) "0-0"
    11) "entries-added"
    12) (integer) 3
    13) "recorded-first-entry-id"
    14) "1702280092986-0"
    15) "groups"
    16) (integer) 1
    17) "first-entry"
    18) 1) "1702280092986-0"
        2) 1) "field1"
           2) "A"
    19) "last-entry"
    20) 1) "1702280099334-0"
        2) 1) "field1"
           2) "C"
    

# xlen

  • 支持:是

  • 说明:返回流中的条目数量。如果指定的键不存在,命令将返回零,就像流为空一样。

    > XADD mystream * item 1
    "1702016706514-0"
    > XADD mystream * item 2
    "1702016714325-0"
    > XADD mystream * item 3
    "1702016721351-0"
    > XLEN mystream
    (integer) 3
    

# xpending

  • 支持:是

  • 说明:获取被消费者组的某个消费者读取,但还没有被确认(即没有收到相应的XACK命令)的信息。

  • 示例:

    > XGROUP CREATE mystream mygroup $ MKSTREAM
    OK
    > XADD mystream * field1 A
    "1702280598636-0"
    > XADD mystream * field1 B
    "1702280603509-0"
    > XADD mystream * field1 C
    "1702280606719-0"
    > XADD mystream * field1 D
    "1702280608610-0"
    > XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702280598636-0"
             2) 1) "field1"
                2) "A"
          2) 1) "1702280603509-0"
             2) 1) "field1"
                2) "B"
          3) 1) "1702280606719-0"
             2) 1) "field1"
                2) "C"
          4) 1) "1702280608610-0"
             2) 1) "field1"
                2) "D"
    > xpending mystream mygroup
    1) (integer) 4
    2) "1702280598636-0"
    3) "1702280608610-0"
    4) 1) 1) "myconsumer"
          2) "4"
    > xpending mystream mygroup - + 2
    1) 1) "1702280598636-0"
       2) "myconsumer"
       3) (integer) 41639
       4) (integer) 1
    2) 1) "1702280603509-0"
       2) "myconsumer"
       3) (integer) 41639
       4) (integer) 1
    

# xrange

  • 支持:是

  • 说明:该命令返回与给定ID范围匹配的流条目。范围由最小和最大ID指定。所有具有在两个指定ID之间(闭区间)或恰好一个指定的ID的条目都将被返回。

  • 示例:

    > XADD writers * name Virginia surname Woolf
    "1702266467947-0"
    > XADD writers * name Jane surname Austen
    "1702266474590-0"
    > XADD writers * name Toni surname Morrison
    "1702266482195-0"
    > XADD writers * name Agatha surname Christie
    "1702266489596-0"
    > XADD writers * name Ngozi surname Adichie
    "1702266496750-0"
    > XLEN writers
    (integer) 5
    > XRANGE writers - + COUNT 2
    1) 1) "1702266467947-0"
       2) 1) "name"
          2) "Virginia"
          3) "surname"
          4) "Woolf"
    2) 1) "1702266474590-0"
       2) 1) "name"
          2) "Jane"
          3) "surname"
          4) "Austen"
    

# xread

  • 支持:是

  • 说明:由活跃频道组成的列表。

  • 示例:

    > XADD writers * name Virginia surname Woolf
    "1702019112368-0"
    > XADD writers * name Jane surname Austen
    "1702019119903-0"
    >  XADD writers * name Toni surname Morrison
    "1702019130707-0"
    > XADD mystream * a 1
    "1702019221640-0"
    > XADD mystream * b 2
    "1702019228375-0"
    > XADD mystream * c 3
    "1702019234721-0"
    > XREAD COUNT 2 STREAMS mystream writers 0-0 0-0
    1) 1) "mystream"
       2) 1) 1) "1702019221640-0"
             2) 1) "a"
                2) "1"
          2) 1) "1702019228375-0"
             2) 1) "b"
                2) "2"
    2) 1) "writers"
       2) 1) 1) "1702019112368-0"
             2) 1) "name"
                2) "Virginia"
                3) "surname"
                4) "Woolf"
          2) 1) "1702019119903-0"
             2) 1) "name"
                2) "Jane"
                3) "surname"
                4) "Austen"
    

# xreadgroup

  • 支持:是

  • 说明:用于从一个或多个流中读取数据,同时将消息标记为由特定消费者组中的特定消费者处理;可将消息的所有权分配给调用命令的消费者,其他消费者将不会再次看到这些消息,除非它们没有被正确处理并通过XPENDING和XCLAIM被重新认领。

  • 示例:

    > XADD mystream * field1 A
    "1702281360230-0"
    > XADD mystream * field1 B
    "1702281363262-0"
    > XADD mystream * field1 C
    "1702281366656-0"
    > XADD mystream * field1 D
    "1702281370567-0"
    > XGROUP CREATE mystream mygroup 0
    OK
    > XREADGROUP GROUP mygroup myconsumer streams mystream >
    1) 1) "mystream"
       2) 1) 1) "1702281360230-0"
             2) 1) "field1"
                2) "A"
          2) 1) "1702281363262-0"
             2) 1) "field1"
                2) "B"
          3) 1) "1702281366656-0"
             2) 1) "field1"
                2) "C"
          4) 1) "1702281370567-0"
             2) 1) "field1"
                2) "D"
    > XREADGROUP GROUP mygroup myconsumer COUNT 2 STREAMS mystream 0
    1) 1) "mystream"
       2) 1) 1) "1702281360230-0"
             2) 1) "field1"
                2) "A"
          2) 1) "1702281363262-0"
             2) 1) "field1"
                2) "B"
    

# xrevrange

  • 支持:是

  • 说明:这个命令与XRANGE非常相似,但有一个显著的区别:返回的条目顺序是相反的,同时接受的起始-结束范围也是相反的顺序。在XREVRANGE中,您需要先指定结束ID,然后指定起始ID,该命令将生成两个ID之间的所有元素(或恰好类似于这两个ID的元素),从结束端开始。

  • 示例:

    > XADD writers * name Virginia surname Woolf
    "1702018069631-0"
    > XADD writers * name Jane surname Austen
    "1702018075959-0"
    >  XADD writers * name Toni surname Morrison
    "1702018082836-0"
    > XADD writers * name Agatha surname Christie
    "1702018088536-0"
    > XADD writers * name Ngozi surname Adichie
    "1702018096270-0"
    > XLEN writers
    (integer) 5
    > XREVRANGE writers + - COUNT 1
    1) 1) "1702018096270-0"
       2) 1) "name"
          2) "Ngozi"
          3) "surname"
          4) "Adichie"
    

# xsetid

  • 支持:是

  • 说明:复制流的最后一次发送的ID。

  • 示例:

    > XADD mystream * field1 A
    "1702282024021-0"
    > XADD mystream * field1 B
    "1702282027981-0"
    > XSETID mystream 1702282027981-0
    OK
    

# xtrim

  • 支持:是

  • 说明:XTRIM通过在需要时驱逐较旧的条目(具有较低ID的条目)来修剪流。

  • 示例:

    > XADD mystream * field1 A field2 B field3 C field4 D
    "1702019508888-0"
    > XTRIM mystream MAXLEN 2
    (integer) 0
    > XRANGE mystream - +
    1) 1) "1702019508888-0"
       2) 1) "field1"
          2) "A"
          3) "field2"
          4) "B"
          5) "field3"
          6) "C"
          7) "field4"
          8) "D"