qdrant-spark
qdrant-spark copied to clipboard
Qdrant Apache Spark connector
Qdrant-Spark Connector
Apache Spark is a distributed computing framework designed for big data processing and analytics. This connector enables Qdrant to be a storage destination in Spark.
Installation
[!IMPORTANT]
Requires Java 8 or above.
GitHub Releases
The packaged jar file can be found here.
Building from source
To build the jar from source, you need JDK@8 and Maven installed.
Once the requirements have been satisfied, run the following command in the project root.
mvn package
This will build and store the fat JAR in the target directory by default.
Maven Central
For use with Java and Scala projects, the package can be found here.
Usage
Creating a Spark session (Single-node) with Qdrant support
from pyspark.sql import SparkSession
spark = SparkSession.builder.config(
"spark.jars",
"spark-VERSION.jar", # Specify the downloaded JAR file
)
.master("local[*]")
.appName("qdrant")
.getOrCreate()
Loading data
[!IMPORTANT] Before loading the data using this connector, a collection has to be created in advance with the appropriate vector dimensions and configurations.
The connector supports ingesting multiple named/unnamed, dense/sparse vectors.
Unnamed/Default vector
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", <QDRANT_GRPC_URL>)
.option("collection_name", <QDRANT_COLLECTION_NAME>)
.option("embedding_field", <EMBEDDING_FIELD_NAME>) # Expected to be a field of type ArrayType(FloatType)
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Named vector
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", <QDRANT_GRPC_URL>)
.option("collection_name", <QDRANT_COLLECTION_NAME>)
.option("embedding_field", <EMBEDDING_FIELD_NAME>) # Expected to be a field of type ArrayType(FloatType)
.option("vector_name", <VECTOR_NAME>)
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
NOTE
The
embedding_fieldandvector_nameoptions are maintained for backward compatibility. It is recommended to usevector_fieldsandvector_namesfor named vectors as shown below.
Multiple named vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Multiple sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Combination of named dense and sparse vectors
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("vector_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("vector_names", "<VECTOR_NAME>,<ANOTHER_VECTOR_NAME>")
.option("sparse_vector_value_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_index_fields", "<COLUMN_NAME>,<ANOTHER_COLUMN_NAME>")
.option("sparse_vector_names", "<SPARSE_VECTOR_NAME>,<ANOTHER_SPARSE_VECTOR_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
No vectors - Entire dataframe is stored as payload
<pyspark.sql.DataFrame>
.write
.format("io.qdrant.spark.Qdrant")
.option("qdrant_url", "<QDRANT_GRPC_URL>")
.option("collection_name", "<QDRANT_COLLECTION_NAME>")
.option("schema", <pyspark.sql.DataFrame>.schema.json())
.mode("append")
.save()
Databricks
You can use the connector as a library in Databricks to ingest data into Qdrant.
- Go to the
Librariessection in your cluster dashboard. - Select
Install Newto open the library installation modal. - Search for
io.qdrant:spark:VERSIONin the Maven packages and clickInstall.
Datatype support
The appropriate Spark data types are mapped to the Qdrant payload based on the provided schema.
Options and Spark types
| Option | Description | Column DataType | Required |
|---|---|---|---|
qdrant_url |
GRPC URL of the Qdrant instance. Eg: http://localhost:6334 | - | ✅ |
collection_name |
Name of the collection to write data into | - | ✅ |
schema |
JSON string of the dataframe schema | - | ✅ |
embedding_field |
Name of the column holding the embeddings | ArrayType(FloatType) |
❌ |
id_field |
Name of the column holding the point IDs. Default: Random UUID | StringType or IntegerType |
❌ |
batch_size |
Max size of the upload batch. Default: 64 | - | ❌ |
retries |
Number of upload retries. Default: 3 | - | ❌ |
api_key |
Qdrant API key for authentication | - | ❌ |
vector_name |
Name of the vector in the collection. | - | ❌ |
vector_fields |
Comma-separated names of columns holding the vectors. | ArrayType(FloatType) |
❌ |
vector_names |
Comma-separated names of vectors in the collection. | - | ❌ |
sparse_vector_index_fields |
Comma-separated names of columns holding the sparse vector indices. | ArrayType(IntegerType) |
❌ |
sparse_vector_value_fields |
Comma-separated names of columns holding the sparse vector values. | ArrayType(FloatType) |
❌ |
sparse_vector_names |
Comma-separated names of the sparse vectors in the collection. | - | ❌ |
shard_key_selector |
Comma-separated names of custom shard keys to use during upsert. | - | ❌ |
LICENSE
Apache 2.0 © 2024