Support for Snowflake connector with Spark
Is your feature request related to a problem? Please describe.
For folks on data bricks or other spark-based platforms, getting large data as a pandas df may not be an option .... It's possible to get a spark df using Snowflake Connector for Python but it's orders of magnitude slower compared to Snowflake connector for Spark
Describe the solution you'd like
Add a function to SnowflakeRetrievalJob which supports executing & retrieving snowflake query through Snowflake Connector for Spark
Describe alternatives you've considered
Tried getting snowflake query results as arrow_batches -> converting to pandas on each batch -> to spark data frame -> concatenate all spark dos to one. This is orders go marinated slower than reading same data frame from Snowflake Connector for Spark.
cc: @sfc-gh-madkins
@adchia would love to use this if completed, since this is one of major pain points for us. Could you pls provide link for docs on how to use this functionality?
Oh woops I had misread this one. Yeah the SparkOfflineStore doesn't connect to Snowflake yet. Feel free to contribute this and I can cut it in a new patch release!
The easiest way to add this
That being said, it should be a matter of adding another mode in the spark_source.py file.
With logic looking something like below being in the get_table_query_string:
options = {
"sfUrl": url,
"sfUser": user,
"sfPassword": password,
"sfDatabase": database,
"sfSchema": schema,
"sfWarehouse": warehouse,
"APPLICATION": "feast",
}
if role:
options["sfRole"] = role
df_reader = spark.read.format("snowflake").options(**options)
if table:
df_reader = df_reader.option("dbtable", table)
else:
df_reader = df_reader.option("query", query)
df = df_reader.load()
tmp_table_name = get_temp_entity_table_name()
df.createOrReplaceTempView(tmp_table_name)
I'd probably add something like an optional snowflake_config in SparkSource to capture the above configs (e.g. warehouse, database, etc)
The better way to add this
Really though, we should support reading SnowflakeSource files and mapping that into the above.
This would probably live in https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py#L76, and instead of failing on SparkSource, also support parsing SnowflakeSource. And then constructing the table_query_string like above.
Oh woops I had misread this one. Yeah the SparkOfflineStore doesn't connect to Snowflake yet. Feel free to contribute this and I can cut it in a new patch release!
The easiest way to add this
That being said, it should be a matter of adding another mode in the spark_source.py file.
With logic looking something like below being in the
get_table_query_string:options = { "sfUrl": url, "sfUser": user, "sfPassword": password, "sfDatabase": database, "sfSchema": schema, "sfWarehouse": warehouse, "APPLICATION": "feast", } if role: options["sfRole"] = role df_reader = spark.read.format("snowflake").options(**options) if table: df_reader = df_reader.option("dbtable", table) else: df_reader = df_reader.option("query", query) df = df_reader.load() tmp_table_name = get_temp_entity_table_name() df.createOrReplaceTempView(tmp_table_name)I'd probably add something like an optional snowflake_config in SparkSource to capture the above configs (e.g. warehouse, database, etc)
The better way to add this
Really though, we should support reading
SnowflakeSourcefiles and mapping that into the above.This would probably live in https://github.com/feast-dev/feast/blob/master/sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark.py#L76, and instead of failing on SparkSource, also support parsing SnowflakeSource. And then constructing the table_query_string like above.
Thanks for getting back ! Few questions based on your response:
-
If this functionality ( reading from snowflake source ) is added in
SparkOfflineStore, aren't we necessarily creating additional data in hive meta-store ? -
Along same lines, can we add this functionality to
SnowflakeOfflineStore, by adding additional functions toSnowflakeRetrievalJobto useSnowflake Connector for Sparkinstead ofSnowflake Connector for Python?
My thought's that, doing through 2 one can still use snowflake as offline store without essentially creating a additional copy of data elsewhere. Let me know your thoughts.
@amithadiraju1694 this should be possible to do
@amithadiraju1694 this is the alternative you tried correct but found it slow? https://github.com/feast-dev/feast/pull/3358
@amithadiraju1694 this is the alternative you tried correct but found it slow? https://github.com/feast-dev/feast/pull/3358
Yup, that's the one I tried.
@amithadiraju1694 this should be possible to do
I tried doing it in SnowflakeRetrievalJob , by sending connection options to spark and using spark connector. It couldn't find feast_df_***** object , and I wasn't sure how to fix it. So I stopped there. Please let me know if you have a solution for that.
I do… let me send some code over in a few daysSent from my iPhoneOn Dec 27, 2022, at 6:24 PM, Amith Adiraju @.***> wrote:
@amithadiraju1694 this should be possible to do
I tried doing it in SnowflakeRetrievalJob , by sending connection options to spark and using spark connector. It couldn't find feast_df_***** object , and I wasn't sure how to fix it. So I stopped there. Please let me know if you have a solution for that.
—Reply to this email directly, view it on GitHub, or unsubscribe.You are receiving this because you were mentioned.Message ID: @.***>
@amithadiraju1694 are you able to give me a side by side numbers comparison using to_spark_df() vs materializing the table in snowflake and then reading it through the spark connector?
This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.