Reids 中有一个功能叫 Stream,可以实现类似于 Kafka 的功能,即消息可以持久化,有消费组且通过 ACK 确认消费成功。
快速写一段代码,创建一个 stream my-stream 和消费组 my-group 。然后消费组 consumer-1 就开始消费消息。
package main
import (
"context"
"fmt"
"log"
"sync"
"time"
"github.com/go-redis/redis/v8"
)
var (
ctx = context.Background()
rdb *redis.Client
streamName = "my-stream"
groupName = "my-group"
)
func main() {
rdb = redis.NewClient(&redis.Options{
Addr: "localhost:6379",
})
_, err := rdb.Ping(ctx).Result()
if err != nil {
log.Fatalf("Could not connect to Redis: %v", err)
}
// Create a consumer group. This will error if the group already exists,
// but for this example, we'll ignore the error.
rdb.XGroupCreateMkStream(ctx, streamName, groupName, "0").Err()
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
consumer("consumer-1")
}()
wg.Wait()
}
func consumer(consumerName string) {
for {
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamName, ">"},
Count: 1,
Block: 0,
}).Result()
if err != nil {
log.Printf("Error reading from stream: %v", err)
continue
}
for _, stream := range streams {
for _, message := range stream.Messages {
fmt.Printf("Consumer %s received message: %v\n", consumerName, message.Values)
// Acknowledge the message
rdb.XAck(ctx, streamName, groupName, message.ID)
}
}
}
}
通过 redis-cli XADD my-stream '*' message "hello from cli" 就可以往 stream 里面塞消息了。然后脚本里面的消费者就开始消费消息,并打印出日志。
通过 XREADGROUP GROUP my-group consumer-checker COUNT 1000 STREAMS my-stream > 就可以查看 my-stream 里面有多少消息未被消费。
GROUP my-group consumer-checker: 我们以一个临时的消费者身份 consumer-checker (名字可以随便起) 来检查。
COUNT 1000: 我们尝试一次性读取最多 1000 条新消息 (可以设一个较大的数字确保能读完)。
STREAMS my-stream >: 这是关键,我们只读取 my-stream 中所有的新消息。
通过 XPENDING my-stream my-group 可以查看有多少消息是已读未回。
通过 XINFO GROUPS my-stream 查看所有的消费组以及消费的堆积量。
1) 1) "name" # 字段名: 消费组名称
2) "my-group" # 字段值: "my-group"
3) "consumers" # 字段名: 消费者数量
4) (integer) 2 # 字段值: 组内有 2 个活动的消费者 (比如 go 程序里的 consumer-1 和我们 cli 里的 consumer-cli)
5) "pending" # 字段名: 待处理消息数
6) (integer) 0 # 字段值: 有 0 条消息被投递但未确认 (XACK)
7) "last-delivered-id" # 字段名: 最后投递的 ID <-- 这就是你想要的“偏移量”!
8) "1754186688588-0" # 字段值: 这是被投递到本组的最后一个消息的 ID
9) "entries-read" # 字段名: 已读取的条目总数 (较新版 Redis 提供)
10) (integer) 7 # 字段值: 组内的消费者总共从流中读取了 7 个条目
11) "lag" # 字段名: 积压量 (较新版 Redis 提供)
12) (integer) 0 # 字段值: 有 0 条新消息在本组最后投递 ID 之后,等待被消费
通过 XRANGE my-stream - + 可以查看到 Stream 里面的所有未消费的信息。
如果有消息未回,这些消息会进入 PENDING 列表,可以通过 redis-cli XPENDING my-stream my-group - + 100 进行查看。