feast icon indicating copy to clipboard operation
feast copied to clipboard

Support for Snowflake connector with Spark

Open amithadiraju1694 opened this issue 3 years ago • 10 comments

Is your feature request related to a problem? Please describe.

For folks on data bricks or other spark-based platforms, getting large data as a pandas df may not be an option .... It's possible to get a spark df using Snowflake Connector for Python but it's orders of magnitude slower compared to Snowflake connector for Spark

Describe the solution you'd like Add a function to SnowflakeRetrievalJob which supports executing & retrieving snowflake query through Snowflake Connector for Spark

Describe alternatives you've considered Tried getting snowflake query results as arrow_batches -> converting to pandas on each batch -> to spark data frame -> concatenate all spark dos to one. This is orders go marinated slower than reading same data frame from Snowflake Connector for Spark.

cc: @sfc-gh-madkins

amithadiraju1694 avatar Nov 28 '22 20:11 amithadiraju1694

@adchia would love to use this if completed, since this is one of major pain points for us. Could you pls provide link for docs on how to use this functionality?

amithadiraju1694 avatar Dec 15 '22 03:12 amithadiraju1694

Oh woops I had misread this one. Yeah the SparkOfflineStore doesn't connect to Snowflake yet. Feel free to contribute this and I can cut it in a new patch release!

The easiest way to add this

That being said, it should be a matter of adding another mode in the spark_source.py file.

With logic looking something like below being in the get_table_query_string:

options = {
    "sfUrl": url,
    "sfUser": user,
    "sfPassword": password,
    "sfDatabase": database,
    "sfSchema": schema,
    "sfWarehouse": warehouse,
    "APPLICATION": "feast",
}

if role:
    options["sfRole"] = role

df_reader = spark.read.format("snowflake").options(**options)

if table:
    df_reader = df_reader.option("dbtable", table)
else:
    df_reader = df_reader.option("query", query)

df = df_reader.load()
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)

I'd probably add something like an optional snowflake_config in SparkSource to capture the above configs (e.g. warehouse, database, etc)

The better way to add this

Really though, we should support reading SnowflakeSource files and mapping that into the above.

This would probably live in https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py#L76, and instead of failing on SparkSource, also support parsing SnowflakeSource. And then constructing the table_query_string like above.

adchia avatar Dec 15 '22 04:12 adchia

Oh woops I had misread this one. Yeah the SparkOfflineStore doesn't connect to Snowflake yet. Feel free to contribute this and I can cut it in a new patch release!

The easiest way to add this

That being said, it should be a matter of adding another mode in the spark_source.py file.

With logic looking something like below being in the get_table_query_string:

options = {
    "sfUrl": url,
    "sfUser": user,
    "sfPassword": password,
    "sfDatabase": database,
    "sfSchema": schema,
    "sfWarehouse": warehouse,
    "APPLICATION": "feast",
}

if role:
    options["sfRole"] = role

df_reader = spark.read.format("snowflake").options(**options)

if table:
    df_reader = df_reader.option("dbtable", table)
else:
    df_reader = df_reader.option("query", query)

df = df_reader.load()
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)

I'd probably add something like an optional snowflake_config in SparkSource to capture the above configs (e.g. warehouse, database, etc)

The better way to add this

Really though, we should support reading SnowflakeSource files and mapping that into the above.

This would probably live in https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py#L76, and instead of failing on SparkSource, also support parsing SnowflakeSource. And then constructing the table_query_string like above.

Thanks for getting back ! Few questions based on your response:

  1. If this functionality ( reading from snowflake source ) is added in SparkOfflineStore , aren't we necessarily creating additional data in hive meta-store ?

  2. Along same lines, can we add this functionality to SnowflakeOfflineStore, by adding additional functions to SnowflakeRetrievalJob to use Snowflake Connector for Spark instead of Snowflake Connector for Python ?

My thought's that, doing through 2 one can still use snowflake as offline store without essentially creating a additional copy of data elsewhere. Let me know your thoughts.

amithadiraju1694 avatar Dec 20 '22 22:12 amithadiraju1694

@amithadiraju1694 this should be possible to do

sfc-gh-madkins avatar Dec 27 '22 22:12 sfc-gh-madkins

@amithadiraju1694 this is the alternative you tried correct but found it slow? https://github.com/feast-dev/feast/pull/3358

sfc-gh-madkins avatar Dec 27 '22 22:12 sfc-gh-madkins

@amithadiraju1694 this is the alternative you tried correct but found it slow? https://github.com/feast-dev/feast/pull/3358

Yup, that's the one I tried.

amithadiraju1694 avatar Dec 28 '22 00:12 amithadiraju1694

@amithadiraju1694 this should be possible to do

I tried doing it in SnowflakeRetrievalJob , by sending connection options to spark and using spark connector. It couldn't find feast_df_***** object , and I wasn't sure how to fix it. So I stopped there. Please let me know if you have a solution for that.

amithadiraju1694 avatar Dec 28 '22 00:12 amithadiraju1694

I do… let me send some code over in a few daysSent from my iPhoneOn Dec 27, 2022, at 6:24 PM, Amith Adiraju @.***> wrote:

@amithadiraju1694 this should be possible to do

I tried doing it in SnowflakeRetrievalJob , by sending connection options to spark and using spark connector. It couldn't find feast_df_***** object , and I wasn't sure how to fix it. So I stopped there. Please let me know if you have a solution for that.

—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you were mentioned.Message ID: @.***>

sfc-gh-madkins avatar Dec 28 '22 00:12 sfc-gh-madkins

@amithadiraju1694 are you able to give me a side by side numbers comparison using to_spark_df() vs materializing the table in snowflake and then reading it through the spark connector?

sfc-gh-madkins avatar Apr 24 '23 16:04 sfc-gh-madkins

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

stale[bot] avatar Sep 17 '23 14:09 stale[bot]