spark-redis icon indicating copy to clipboard operation
spark-redis copied to clipboard

Conditional update

Open dggc opened this issue 5 years ago • 9 comments

Hi, how are you doing?

I'd like to know if there is a recommended way to perform a conditional update on redis from spark.

Basically, what I want to do is:

  • Given a spark dataframe with columns key, hash_field, value, value_version
  • For each row in the spark dataframe, I'd like to check whether the corresponding key exists in Redis. If not, run hset key hash_field hash_field+version value_version
  • If it does exist, I'd like to only update the value and value version if the version in my spark dataframe is higher than the version in the redis hash field.

I've searched for this in this Github and in the web in general, and the most similar question I found was this -> https://stackoverflow.com/questions/22906778/conditional-redis-set-only-update-with-newest-version

However, this involves writing and running a Lua script. While I'm fine with that, I wonder what would the best course of action be. Is there a recommended way to do this conditional upsert? If Lua is indeed the way to go, what's the best way to do so from spark? Is it possible using spark-redis, or should I go down a level and use Jedis directly for each row?

Thanks in advance, and sorry if this wasn't the right place for this kind of question :)

dggc avatar Aug 06 '20 15:08 dggc

Hi @dggc , Could you perform the update logic in Spark, i.e. read the dataframe from Redis to Spark, find out the rows that needs to be updated and then write them back with SaveMode.Append?

fe2s avatar Aug 06 '20 19:08 fe2s

Hi @fe2s , thanks for your feedback! Yes, that would be possible, and in our case there wouldn't even be a race condition scenario, because we have a single process writing to Redis (that is, this spark job). However, wouldn't that add a lot of overhead and latency to the process? But it does seem like a simpler solution :)

dggc avatar Aug 06 '20 20:08 dggc

Would it be possible to read only the keys in the spark dataframe? If we have to read the whole table, it would be too expensive, but if we are able to retrieve only the keys in the DF, it would indeed be a viable solution

dggc avatar Aug 06 '20 20:08 dggc

Yes, it's possible. You can specify only the id in the dataframe schema. https://github.com/RedisLabs/spark-redis/blob/master/doc/dataframe.md#reading-redis-hashes

fe2s avatar Aug 06 '20 20:08 fe2s

But suppose I have a DF with, say 5 rows, each with unique IDs with unique counterparts in Redis. But our Redis database has millions of rows. I want to retrieve only the 5 rows back, so I can make the necessary comparisons and write only the 5 rows back.

I believe this read option you mentioned would retrieve every row, not just the 5 I want, is that correct?

dggc avatar Aug 06 '20 20:08 dggc

With the read operation you specify the key prefix .option("keys.pattern", "person:*") so only the corresponding records are loaded into Spark.

fe2s avatar Aug 07 '20 05:08 fe2s

Hm, unfortunately, I believe this wouldn't work in our case. Since we are using Spark Streaming to update the Redis Database, using foreachBatch and micro batches dataframes, we are working with only a small fraction of the Redis database at each time.

While loading the whole database would work, it would be very slow. Unless the Redis connector has some built-in logic to do Redis-side filtering, instead of loading all the keys to Spark and then filtering from spark?

We could use a regex such as "id1|id2|id3|..." generated from collecting the microbatch ids, but that doesn't seem like it would scale well.

I believe we'll probably have to use mapPartitions to open a separate redis connection for each partition of the microbatch, and then go over each row/key individually.

dggc avatar Aug 07 '20 11:08 dggc

The fromRedisHash also accepts an Array[String] of keys so you could do this -

val fromRedisDf = sc.fromRedisHash(Array("key1", "key2")).toDF("key", "value1", "value2")

This does all filtering on the Redis server, and is more efficient than pattern.

mayankasthana avatar Aug 24 '20 21:08 mayankasthana

Thanks everyone for your suggestions!

We ended up using the foreach writer, a lua script to do our conditional upsert, and the Jedis library, since our micro batches can be arbitrarily big. It is good to know about those options though.

Daniel Galinkin

On Mon, Aug 24, 2020 at 6:40 PM Mayank Asthana [email protected] wrote:

The fromRedisKV also accepts an Array[String] of keys so you could do this -

val fromRedisDf = sc.fromRedisKV(Array("key1", "key2")).toDF("key", "value")

This does all filtering on the Redis server, and is more efficient than pattern.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/RedisLabs/spark-redis/issues/260#issuecomment-679380645, or unsubscribe https://github.com/notifications/unsubscribe-auth/AABA33A3GL2Q5TEDVG6PWFLSCLM3NANCNFSM4PWWQNTQ .

dggc avatar Aug 25 '20 13:08 dggc