dify icon indicating copy to clipboard operation
dify copied to clipboard

When cleaning documents in the knowledge base, the CPU is too high

Open xiaohuihuige opened this issue 1 year ago • 7 comments

Self Checks

  • [X] This is only for bug report, if you would like to ask a question, please head to Discussions.
  • [X] I have searched for existing issues search for existing issues, including closed ones.
  • [X] I confirm that I am using English to submit this report (我已阅读并同意 Language Policy).
  • [X] [FOR CHINESE USERS] 请务必使用英文提交 Issue,否则会被关闭。谢谢!:)
  • [X] Please do not modify this template :) and fill in all the required fields.

Dify version

0.6.15

Cloud or Self Hosted

Cloud

Steps to reproduce

If there is too much text to be cleaned, the interface processing will be very slow, causing the interface to be blocked. After consulting relevant knowledge, API blockage in the Flask framework can lead to CPU spikes

API interface for cleaning api.add_resource(DatasetIndexingEstimateApi, '/datasets/indexing-estimate') image

✔️ Expected Behavior

No response

❌ Actual Behavior

No response

xiaohuihuige avatar Jul 31 '24 08:07 xiaohuihuige

是本地部署,不是云端的

xiaohuihuige avatar Jul 31 '24 08:07 xiaohuihuige

To reduce high CPU usage when cleaning documents in the knowledge base using the Flask framework, you can consider the following optimizations:

  1. Use Economical Index Mode: Utilize the "Economical" mode that uses offline vector engines and keyword indexes to reduce accuracy without spending tokens. This mode can help reduce CPU usage.

    economical: 'Economical',
    economicalTip: 'Use offline vector engines, keyword indexes, etc. to reduce accuracy without spending tokens',
    
  2. Optimize Chunk Settings: Adjust chunk settings to optimize processing. For example, set a reasonable chunk overlap and maximum chunk length.

    overlap: 'Chunk overlap',
    overlapTip: 'Setting the chunk overlap can maintain the semantic relevance between them, enhancing the retrieve effect. It is recommended to set 10%-25% of the maximum chunk size.',
    overlapCheck: 'chunk overlap should not bigger than maximum chunk length',
    
  3. Remove Unnecessary Preprocessing Steps: Only enable necessary preprocessing rules to avoid extra CPU load.

    rules: 'Text preprocessing rules',
    removeExtraSpaces: 'Replace consecutive spaces, newlines and tabs',
    removeUrlEmails: 'Delete all URLs and email addresses',
    removeStopwords: 'Remove stopwords such as "a", "an", "the"',
    
  4. Optimize Indexing Process: Ensure the indexing process is efficient. Handle exceptions properly to avoid unnecessary retries and optimize the extraction, transformation, and loading steps.

    class IndexingRunner:
        def run(self, dataset_documents: list[DatasetDocument]):
            for dataset_document in dataset_documents:
                try:
                    dataset = Dataset.query.filter_by(id=dataset_document.dataset_id).first()
                    if not dataset:
                        raise ValueError("no dataset found")
    
                    processing_rule = db.session.query(DatasetProcessRule).filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id).first()
                    index_type = dataset_document.doc_form
                    index_processor = IndexProcessorFactory(index_type).init_index_processor()
    
                    text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
                    documents = self._transform(index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict())
                    self._load_segments(dataset, dataset_document, documents)
                    self._load(index_processor, dataset, dataset_document, documents)
                except Exception as e:
                    logging.exception("consume document failed")
                    dataset_document.indexing_status = 'error'
                    dataset_document.error = str(e)
                    dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
                    db.session.commit()
    
  5. Parallelize Document Formatting: Use threading to parallelize the formatting of QA documents, distributing the workload across multiple threads.

    def transform(self, documents: list[Document], **kwargs) -> list[Document]:
        splitter = self._get_splitter(processing_rule=kwargs.get('process_rule'),
                                      embedding_model_instance=kwargs.get('embedding_model_instance'))
    
        all_documents = []
        all_qa_documents = []
        for document in documents:
            document_text = CleanProcessor.clean(document.page_content, kwargs.get('process_rule'))
            document.page_content = document_text
    
            document_nodes = splitter.split_documents([document])
            split_documents = []
            for document_node in document_nodes:
                if document_node.page_content.strip():
                    doc_id = str(uuid.uuid4())
                    hash = helper.generate_text_hash(document_node.page_content)
                    document_node.metadata['doc_id'] = doc_id
                    document_node.metadata['doc_hash'] = hash
                    page_content = document_node.page_content
                    if page_content.startswith(".") or page_content.startswith("。"):
                        page_content = page_content[1:]
                    document_node.page_content = page_content
                    split_documents.append(document_node)
            all_documents.extend(split_documents)
        for i in range(0, len(all_documents), 10):
            threads = []
            sub_documents = all_documents[i:i + 10]
            for doc in sub_documents:
                document_format_thread = threading.Thread(target=self._format_qa_document, kwargs={
                    'flask_app': current_app._get_current_object(),
                    'tenant_id': kwargs.get('tenant_id'),
                    'document_node': doc,
                    'all_qa_documents': all_qa_documents,
                    'document_language': kwargs.get('doc_language', 'English')})
                threads.append(document_format_thread)
                document_format_thread.start()
            for thread in threads:
                thread.join()
        return all_qa_documents
    
  6. Efficient Cache Cleaning: Implement a scheduled task to clean up old embeddings from the database, reducing the load on the system.

    import datetime
    import time
    
    import click
    from flask import current_app
    from werkzeug.exceptions import NotFound
    
    import app
    from extensions.ext_database import db
    from models.dataset import Embedding
    
    @app.celery.task(queue='dataset')
    def clean_embedding_cache_task():
        click.echo(click.style('Start clean embedding cache.', fg='green'))
        clean_days = int(current_app.config.get('CLEAN_DAY_SETTING'))
        start_at = time.perf_counter()
        thirty_days_ago = datetime.datetime.now() - datetime.timedelta(days=clean_days)
        page = 1
        while True:
            try:
                embeddings = db.session.query(Embedding).filter(Embedding.created_at < thirty_days_ago) \
                    .order_by(Embedding.created_at.desc()).paginate(page=page, per_page=100)
            except NotFound:
                break
            for embedding in embeddings:
                db.session.delete(embedding)
            db.session.commit()
            page += 1
        end_at = time.perf_counter()
        click.echo(click.style('Cleaned embedding cache from db success latency: {}'.format(end_at - start_at), fg='green'))
    

By implementing these optimizations, you can reduce the CPU usage during the document cleaning process in your Flask application [1][2][3][4].

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Jul 31 '24 08:07 dosubot[bot]

This text segmentation and cleaning interface task was not processed by Celery, and the processing is very slow. The interface is like blocking, and the system performance has become extremely poor. When can the backend experts optimize it

xiaohuihuige avatar Aug 01 '24 01:08 xiaohuihuige

This seems to be a complex issue that needs your expertise, @takatost, could you please take a look?

To continue talking to Dosu, mention @dosu.

dosubot[bot] avatar Aug 01 '24 01:08 dosubot[bot]

We also encountered a similar problem, uploading more than 10M files, the entire dify application stuck. Looking forward to the official reply, how to effectively deal with this problem

yfz123456 avatar Aug 15 '24 08:08 yfz123456

We also encountered a similar problem, uploading more than 10M files, the entire dify application stuck. Looking forward to the official reply, how to effectively deal with this problem我们也遇到过类似的问题,上传超过10M的文件,整个dify应用卡住了。期待官方回复,如何有效处理这个问题

I have now placed the cleaning task for this interface on Celery for asynchronous execution, which will not cause any lag

xiaohuihuige avatar Aug 23 '24 09:08 xiaohuihuige

We also encountered a similar problem, uploading more than 10M files, the entire dify application stuck. Looking forward to the official reply, how to effectively deal with this problem我们也遇到过类似的问题,上传超过10M的文件,整个dify应用卡住了。期待官方回复,如何有效处理这个问题

I have now placed the cleaning task for this interface on Celery for asynchronous execution, which will not cause any lag

That sounds great. How do you handle it? Can you tell me how to handle it

yfz123456 avatar Aug 26 '24 03:08 yfz123456

We also encountered a similar problem, uploading more than 10M files, the entire dify application stuck. Looking forward to the official reply, how to effectively deal with this problem我们也遇到过类似的问题,上传超过10M的文件,整个dify应用卡住了。期待官方回复,如何有效处理这个问题

I have now placed the cleaning task for this interface on Celery for asynchronous execution, which will not cause any lag

We also encountered a similar problem, uploading more than 10M files, the entire dify application stuck. Looking forward to the official reply, how to effectively deal with this problem我们也遇到过类似的问题,上传超过10M的文件,整个dify应用卡住了。期待官方回复,如何有效处理这个问题

I have now placed the cleaning task for this interface on Celery for asynchronous execution, which will not cause any lag

你好,能请教一下,如何解决的吗?

loki-keroro avatar Dec 11 '24 13:12 loki-keroro