redigo icon indicating copy to clipboard operation
redigo copied to clipboard

add parsing for stream entries

Open smotes opened this issue 4 years ago • 6 comments

This PR is related to issue https://github.com/gomodule/redigo/issues/375 and adds a first pass at parsing replies containing stream entries into a struct.

I tested things against a Redis server v5.0.7, which is the first version containing the streams feature: https://redislabs.com/blog/redis-5-0-is-here/.

smotes avatar Mar 24 '21 03:03 smotes

Thanks for looking at this.

I've done a quick pass and adding some initial feedback

See commit 4a86003844a18575e980d5069d371d3e541bc74a.

smotes avatar Mar 26 '21 17:03 smotes

Is this working for XREAD or XREADGROUP? I wonder this is not working well for them because the nested structure is different, sorry if this is out of scope.

tk42 avatar Jul 31 '21 05:07 tk42

Is this working for XREAD or XREADGROUP? I wonder this is not working well for them because the nested structure is different, sorry if this is out of scope.

You raise a valid question @tk42 it does look like this will only work for XRANGE which I don't think is clear from the definition of Entries. So I wonder if there is a better way to do this which is more generic.

Any thoughts @smotes ?

stevenh avatar Sep 30 '21 10:09 stevenh

Hi @stevenh.

I propose we keep things as they stand. The Redis docs distinguish between entries and stream names in return types, so I felt it on the caller to parse out the reply in parts and know where to find the nested entries within.

Consider the XREADGROUP Redis command docs (https://redis.io/commands/xreadgroup#return-value).

Array reply, specifically: The command returns an array of results: each element of the returned array is an array composed of a two element containing the key name and the entries reported for that key. The entries reported are full stream entries, having IDs and the list of all the fields and values. Field and values are guaranteed to be reported in the same order they were added by XADD.

And here's a little code snippet showing how to parse that as is (adapted from some code in a personal project).

reply, err := sc.conn.Do("XREADGROUP", "GROUP", "some-group-name", "some-consumer-name", 
	"BLOCK", 3000, "COUNT", 10, "STREAMS", "some-stream-name", ">")
vs, err := redis.Values(reply, err)
if err != nil {
	if errors.Is(err, redis.ErrNil) {
		continue
	}
	return err
}

// Get the first and only value in the array since we're only
// reading from one stream "some-stream-name" here.
vs, err = redis.Values(vs[0], nil)
if err != nil {
	return err
}

// Ignore the stream name as the first value as we already have 
// that in hand! Just get the second value which is guaranteed to 
// exist per the docs, and parse it as some stream entries.
entries, err := ParseEntries(vs[1], nil)
if err != nil {
	return fmt.Errorf("error parsing entries: %w", err)
}

smotes avatar Jan 07 '22 01:01 smotes

That makes sense to me @tk42 does that answer address your concerns?

stevenh avatar Jan 07 '22 17:01 stevenh

Hi @smotes @stevenh In that snippet, it contains only a single stream key but how about multiple stream key case? From redis docs, XREAD/XREADGROUP can take multiple keys (and corresponded ids).

XREAD [COUNT count] [BLOCK milliseconds] STREAMS key [key ...] id [id ...] XREADGROUP GROUP group consumer [COUNT count] [BLOCK milliseconds] [NOACK] STREAMS key [key ...] id [id ...]

I’m feeling that users for XREAD/XREADGROUP will call redis.Values multiple times as a template pattern, so a new struct of return-type for them might help those users. Something like this ;)

type StreamEntry struct {
    Stream       string
    Entries      []Entry
}

and its corresponded parsing function like this

func StreamEntries(reply interface{}, err error) ([]StreamEntry, error) {
    vss, err := redis.Values(reply, err)
    if err != nil {
        // error handling
	    return nil, fmt.Errorf("error parsing reply: %w", err)
    }

    var streamEntries []StreamEntry
    for _, vs := range vss {
        name, err := redis.String(vs[0], nil)
        if err != nil {
            // error handling
            streamEntries = append(streamEntries, new(StreamEntry))
        }

        entries, err := ParseEntries(vs[1], nil)
        if err != nil {
            // error handling
            streamEntries = append(streamEntries, new(StreamEntry))
        }

        streamEntries = append(streamEntries, StreamEntry{
               Stream: name,
               Entries: entries,
        })
    }
    return streamEntries, nil
}

tk42 avatar Jan 08 '22 02:01 tk42