add parsing for stream entries
This PR is related to issue https://github.com/gomodule/redigo/issues/375 and adds a first pass at parsing replies containing stream entries into a struct.
I tested things against a Redis server v5.0.7, which is the first version containing the streams feature: https://redislabs.com/blog/redis-5-0-is-here/.
Thanks for looking at this.
I've done a quick pass and adding some initial feedback
See commit 4a86003844a18575e980d5069d371d3e541bc74a.
Is this working for XREAD or XREADGROUP? I wonder this is not working well for them because the nested structure is different, sorry if this is out of scope.
Is this working for
XREADorXREADGROUP? I wonder this is not working well for them because the nested structure is different, sorry if this is out of scope.
You raise a valid question @tk42 it does look like this will only work for XRANGE which I don't think is clear from the definition of Entries. So I wonder if there is a better way to do this which is more generic.
Any thoughts @smotes ?
Hi @stevenh.
I propose we keep things as they stand. The Redis docs distinguish between entries and stream names in return types, so I felt it on the caller to parse out the reply in parts and know where to find the nested entries within.
Consider the XREADGROUP Redis command docs (https://redis.io/commands/xreadgroup#return-value).
Array reply, specifically: The command returns an array of results: each element of the returned array is an array composed of a two element containing the key name and the entries reported for that key. The entries reported are full stream entries, having IDs and the list of all the fields and values. Field and values are guaranteed to be reported in the same order they were added by XADD.
And here's a little code snippet showing how to parse that as is (adapted from some code in a personal project).
reply, err := sc.conn.Do("XREADGROUP", "GROUP", "some-group-name", "some-consumer-name",
"BLOCK", 3000, "COUNT", 10, "STREAMS", "some-stream-name", ">")
vs, err := redis.Values(reply, err)
if err != nil {
if errors.Is(err, redis.ErrNil) {
continue
}
return err
}
// Get the first and only value in the array since we're only
// reading from one stream "some-stream-name" here.
vs, err = redis.Values(vs[0], nil)
if err != nil {
return err
}
// Ignore the stream name as the first value as we already have
// that in hand! Just get the second value which is guaranteed to
// exist per the docs, and parse it as some stream entries.
entries, err := ParseEntries(vs[1], nil)
if err != nil {
return fmt.Errorf("error parsing entries: %w", err)
}
That makes sense to me @tk42 does that answer address your concerns?
Hi @smotes @stevenh
In that snippet, it contains only a single stream key but how about multiple stream key case? From redis docs, XREAD/XREADGROUP can take multiple keys (and corresponded ids).
XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]
I’m feeling that users for XREAD/XREADGROUP will call redis.Values multiple times as a template pattern, so a new struct of return-type for them might help those users. Something like this ;)
type StreamEntry struct {
Stream string
Entries []Entry
}
and its corresponded parsing function like this
func StreamEntries(reply interface{}, err error) ([]StreamEntry, error) {
vss, err := redis.Values(reply, err)
if err != nil {
// error handling
return nil, fmt.Errorf("error parsing reply: %w", err)
}
var streamEntries []StreamEntry
for _, vs := range vss {
name, err := redis.String(vs[0], nil)
if err != nil {
// error handling
streamEntries = append(streamEntries, new(StreamEntry))
}
entries, err := ParseEntries(vs[1], nil)
if err != nil {
// error handling
streamEntries = append(streamEntries, new(StreamEntry))
}
streamEntries = append(streamEntries, StreamEntry{
Stream: name,
Entries: entries,
})
}
return streamEntries, nil
}