haystack-core-integrations icon indicating copy to clipboard operation
haystack-core-integrations copied to clipboard

Pattern for use of pgvector document store with multiple users querying

Open phiweger opened this issue 1 year ago • 5 comments

Hi,

Is there a best practive on how to use a pgvector document store in a "server setting"? We have multiple users sending in requests to our (fastapi) backend, but reusing the same document store is a blocking operation.

We could instantiate a new document store with each request, but then we run into connection issues, because there is no check on how many connections are open at any one time (ie we need a connection pool for this).

What is best practice in this circumstance?

Best and thanks, Adrian

phiweger avatar Feb 05 '25 19:02 phiweger

For example, ATM I use engine = create_engine(...) from sqlmodel (which inherits from sqlalchemy) like so:

with Session(engine) as session:
    session.execute("some sql")

If we could an engine or session to the PgvectorDocumentStore document store, as an alternative to a URI (connection_string), then the connection pool could manage all connections across all requests.

phiweger avatar Feb 05 '25 22:02 phiweger

Looking at the code of how PgvectorDocumentStore implements connections, I guess the best from a developer experience would be to have an async-enabled PgvectorDocumentStore. This seems to be in line with recent developments, see eg https://github.com/deepset-ai/haystack/issues/6012

phiweger avatar Feb 06 '25 07:02 phiweger

Hello came here with the same problem but with a different implementation. I am facing issues parallelizing retrieving data from the Pgvector store using PgvectorEmbeddingRetriever, as I need to retrieve for X documents and it is taking N times. Even with threading using python, it is taking same time.

After further investigation, it seems that the database is not able to receive multiple requests at the same time.

rhajou avatar Feb 09 '25 10:02 rhajou

@phiweger I will drop my code here for more clarification:

import threading
import openai
import time
import os
from src.helpers.formatter import create_conn_str

from haystack_integrations.document_stores.pgvector import \
    PgvectorDocumentStore
from haystack_integrations.components.retrievers.pgvector import \
    PgvectorEmbeddingRetriever

import random

result= []

    
cfg_db = {"schema_name": "public",
    "table_name": "table_dummy", # Table name in AlloyDB
    "embedding_dimension": 768 , # Dimensionality of your embeddings
    "vector_function": "cosine_similarity", # Similarity function for vector search
    "recreate_table": False, # Set to True to recreate the table structure
    "search_strategy": "hnsw",
    "hnsw_recreate_index_if_exists": False,
    "keyword_index_name": "table_dummy_keyword_index",
    }
document_store = PgvectorDocumentStore(**cfg_db)
simple_retriever = PgvectorEmbeddingRetriever(document_store=document_store)

def run_retriever(i):
    out = simple_retriever.run(query_embedding=[random.random()]*768, top_k=3, filters={})
    print(f"{time.time()} {i} {len(out['documents'])}")
    result.append(out)

def main():

    nb_concurrent_retrievals = 3
    creds_vdb = {"ip_host": "XXXXX",
            "port": 55000,
            "db_name": "postgres",
            "username": "user",
            "password": "pass"}
    os.environ["PG_CONN_STR"] = create_conn_str(creds_vdb) # creating the connection to Alloydb

    # Threads with different prompts
    threads = [threading.Thread(target=run_retriever, args=(i,)) for i in range(0,nb_concurrent_retrievals) ]
    # Start threads
    start = time.time()
    [t.start() for t in threads]
    [t.join() for t in threads]
    end = time.time()
    print(len(result))
    print(len(result[0]['documents']))
    print(f"ALl threads finished. finished in {round(end-start,2)}")

# Run the main function
if __name__ == "__main__":
    main()

If I put nb_concurrent_retrievals = 1 , it will take 1.16 seconds If I put nb_concurrent_retrievals = 3 , it will take 3.48 seconds If I put nb_concurrent_retrievals = 10 , it will take 12.74 seconds If I put nb_concurrent_retrievals = 100 , it will take 116.32 seconds

@phiweger did you find any solution around that? Should I switch to another Vector db ?

rhajou avatar Feb 09 '25 11:02 rhajou

@phiweger found a way to solve this?

As I am facing sometime DocumentStoreError: Could not create HNSW index.

rhajou avatar Feb 26 '25 13:02 rhajou