pipecat icon indicating copy to clipboard operation
pipecat copied to clipboard

Add extra processor to check for empty frames

Open janwout opened this issue 1 year ago • 7 comments

I am using the storybot example. And I found out that sometimes the recording of the microphone stops, because it got some audio. But the trancription is never created, and so the whole proces stops.

My question is: What is a nice way, to prevent this kind of block. Is there a way to let the frontend know it did not receive anything it can use? So de microphone is turned on again?

Idea (from Gen)

The storybot demo doesn't use a client side VAD (unlike some of the other demos.) There might be an extra processor we need to add there for check for empty text frames. Storybot is a bit unique, because it uses a 'turn' based approach, so it will get stuck if it thinks it's the LLMs turn, but the LLM has no data to work with. Thanks for raising thi

janwout avatar Jul 17 '24 11:07 janwout

@janwout Hi! Sorry for the delay. Some weeks ago I added this processor https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/processors/user_idle_processor.py. Do you think you can leverage that?

aconchillo avatar Jul 23 '24 18:07 aconchillo

Here's an example: https://github.com/pipecat-ai/pipecat/blob/main/examples/foundational/17-detect-user-idle.py

aconchillo avatar Jul 23 '24 18:07 aconchillo

Thank you, I will try it out

janwout avatar Jul 30 '24 17:07 janwout

I just recalled we have another processor: https://github.com/pipecat-ai/pipecat/blob/main/src/pipecat/processors/idle_frame_processor.py which will let you choose which frames you are expecting and not receving and call a provided callback.

aconchillo avatar Jul 30 '24 17:07 aconchillo

Thanks, implemented has not succeeded yet, I want to check if there is something in the TranscriptionFrame, and callback to user input if not. Tips are welcome...

I use the storytelling bot example, if I get it working, shall I create a PR to add this to the example?

janwout avatar Sep 19 '24 07:09 janwout

@aconchillo I am trying to implement the idle_frame_processor, it is not succeeding. Could you maybe give an example how to implement for this bot?

import argparse
import asyncio
import os
import sys

import aiohttp
from dotenv import load_dotenv
from loguru import logger
from pipecat.frames.frames import EndFrame, LLMMessagesFrame, StopTaskFrame
from pipecat.pipeline.pipeline import Pipeline
from pipecat.pipeline.runner import PipelineRunner
from pipecat.pipeline.task import PipelineTask
from pipecat.processors.aggregators.llm_response import (
    LLMAssistantResponseAggregator,
    LLMUserResponseAggregator,
)
from pipecat.services.openai import OpenAILLMService, OpenAITTSService
from pipecat.transports.services.daily import (
    DailyParams,
    DailyTranscriptionSettings,
    DailyTransport,
    DailyTransportMessageFrame,
)
from pipecat.vad.silero import SileroVADAnalyzer

from processors import StoryProcessor
from prompts import CUE_USER_TURN, LLM_BASE_PROMPT, LLM_INTRO_PROMPT
from utils.helpers import load_images, load_sounds

load_dotenv(override=True)

logger.remove(0)
logger.add(sys.stderr, level="DEBUG")

sounds = load_sounds(["listening.wav"])
images = load_images(["book1.png", "book2.png"])


async def main(room_url, token=None):
    async with aiohttp.ClientSession() as session:

        # -------------- Transport --------------- #

        transport = DailyTransport(
            room_url,
            token,
            "Storytelling Bot",
            DailyParams(
                audio_out_enabled=True,
                audio_out_sample_rate=24000,
                camera_out_enabled=False,
                transcription_enabled=True,
                vad_enabled=True,
                vad_analyzer=SileroVADAnalyzer(),
                transcription_settings=DailyTranscriptionSettings(
                    language="nl",
                    tier="nova",
                    model="2-general",
                    profanity_filter=True,
                    redact=False,
                    endpointing=True,
                    punctuate=True,
                    includeRawResponse=True,
                    extra={"interim_results": True},
                ),
            ),
        )

        logger.debug("Transport created for room:" + room_url)

        # -------------- Services --------------- #

        llm_service = OpenAILLMService(
            api_key=os.getenv("OPENAI_API_KEY"), model="gpt-4o"
        )

        tts_service = OpenAITTSService(
            api_key=os.getenv("OPENAI_API_KEY"), voice="nova"
        )

        # --------------- Setup ----------------- #

        message_history = [LLM_BASE_PROMPT]
        story_pages = []

        # We need aggregators to keep track of user and LLM responses
        llm_responses = LLMAssistantResponseAggregator(message_history)
        user_responses = LLMUserResponseAggregator(message_history)

        # -------------- Processors ------------- #

        story_processor = StoryProcessor(message_history, story_pages)

        # -------------- Story Loop ------------- #

        runner = PipelineRunner()

        # The intro pipeline is used to start
        # the story (as per LLM_INTRO_PROMPT)
        intro_pipeline = Pipeline([llm_service, tts_service, transport.output()])

        intro_task = PipelineTask(intro_pipeline)

        logger.debug("Waiting for participant...")

        @transport.event_handler("on_first_participant_joined")
        async def on_first_participant_joined(transport, participant):
            logger.debug("Participant joined, storytime commence!")
            transport.capture_participant_transcription(participant["id"])
            await intro_task.queue_frames(
                [
                    images["book1"],
                    LLMMessagesFrame([LLM_INTRO_PROMPT]),
                    DailyTransportMessageFrame(CUE_USER_TURN),
                    sounds["listening"],
                    images["book2"],
                    StopTaskFrame(),
                ]
            )

        # We run the intro pipeline. This will start the transport. The intro
        # task will exit after StopTaskFrame is processed.
        await runner.run(intro_task)

        # The main story pipeline is used to continue the story based on user
        # input.
        main_pipeline = Pipeline(
            [
                transport.input(),
                user_responses,
                llm_service,
                story_processor,
                tts_service,
                transport.output(),
                llm_responses,
            ]
        )

        main_task = PipelineTask(main_pipeline)

        @transport.event_handler("on_participant_left")
        async def on_participant_left(transport, participant, reason):
            intro_task.queue_frame(EndFrame())
            await main_task.queue_frame(EndFrame())

        @transport.event_handler("on_call_state_updated")
        async def on_call_state_updated(transport, state):
            if state == "left":
                await main_task.queue_frame(EndFrame())

        await runner.run(main_task)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Daily Storyteller Bot")
    parser.add_argument("-u", type=str, help="Room URL")
    parser.add_argument("-t", type=str, help="Token")
    config = parser.parse_args()

    asyncio.run(main(config.u, config.t))

janwout avatar Sep 25 '24 08:09 janwout

What happens sometimes is that after an unclear transport.input() or deepgram transcription, the pipeline stops. So I am trying to implement it to fix this bug. But no succes after a lot of trying

janwout avatar Sep 25 '24 09:09 janwout

Sorry for the lack of response here. If you're still working on this problem, feel free to reopen this issue.

chadbailey59 avatar Jan 09 '25 19:01 chadbailey59