haystack icon indicating copy to clipboard operation
haystack copied to clipboard

feat: Add TTFT support in OpenAI generators

Open LastRemote opened this issue 1 year ago • 8 comments

Related Issues

  • adds feature requested in #8385

Proposed Changes:

  • adds TTFT support for OpenAI generators

How did you test it?

  • unit tests

Notes for the reviewer

  • I am not exactly sure how to set streaming_options.include_usage in OpenAI SDK, and thus still no usage data for streaming LLMs.
  • The timestamp is stored in isoformat so it can be safely dumped when deployed as a service.

Checklist

LastRemote avatar Oct 08 '24 10:10 LastRemote

Pull Request Test Coverage Report for Build 11614471154

Details

  • 0 of 0 changed or added relevant lines in 0 files are covered.
  • 13 unchanged lines in 1 file lost coverage.
  • Overall coverage increased (+0.01%) to 90.128%

Files with Coverage Reduction New Missed Lines %
components/generators/chat/openai.py 13 80.99%
<!-- Total: 13
Totals Coverage Status
Change from base Build 11595979732: 0.01%
Covered Lines: 7733
Relevant Lines: 8580

💛 - Coveralls

coveralls avatar Oct 08 '24 10:10 coveralls

Ok @LastRemote regarding streaming response this approach worked for me. It is slightly different that yours, perhaps have a look at it and consider some of these changes.

vblagoje avatar Oct 08 '24 12:10 vblagoje

Ok @LastRemote regarding streaming response this approach worked for me. It is slightly different that yours, perhaps have a look at it and consider some of these changes.

@vblagoje Hello, I reviewed your approach and still have some concerns regarding the meta structure and potential integration with Langfuse. Specifically, when a chunk is built, the current behavior captures fields like index, model, tool_calls, and stop_reason. However, I believe we should avoid including this information in the meta["usage"] field when merging meta across chunks. Instead, I prefer preserving the usage meta to reflect only the information directly from the LLM response (or leave it empty if no usage data is captured). This would help prevent potential mismatches when passing usage data to third-party tracers or other downstream services.

Additionally, I converted the TTFT timestamp to ISO format. This change is intended to make it easier to log the entire ChatMessage object or include it as part of a service response. It also requires a slightly different update in the Langfuse integration (which I already implemented in my current project):

        ...
        if tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
            replies = span._data.get("haystack.component.output", {}).get("replies")
            if replies:
                meta = replies[0].meta
                completion_start_time = meta.get("completion_start_time")
                if completion_start_time:
                    completion_start_time = datetime.fromisoformat(completion_start_time)
                span._span.update(
                    usage=meta.get("usage") or None,
                    model=meta.get("model"),
                    completion_start_time=completion_start_time,
                )

LastRemote avatar Oct 11 '24 04:10 LastRemote

Ok thanks @LastRemote - I'll take a look at this again soon, got sidetracked with some other higher prio tasks, this week def.

vblagoje avatar Oct 14 '24 10:10 vblagoje

@LastRemote I took a closer look and I noticed that you implemented this feature in OpenAIGenerator. As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

vblagoje avatar Oct 15 '24 09:10 vblagoje

@LastRemote As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

Sure thing, we can focus on OpenAIChatGenerator for now. I personally find the generator implementations a bit cumbersome too. When playing around with haystack, I actually did write an extra ChatModelService abstraction that has an.infer() method to make corrsponding calls to LLMs and cast the response as List[ChatMessage]. This way I can have a single ChatGenerator component that uses a ChatModelService, and I am able to switch between different models without needing to change my pipeline architecture. Not sure if this aligns with what you have in mind, but I hope it helps.

LastRemote avatar Oct 16 '24 09:10 LastRemote

@LastRemote As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

Sure thing, we can focus on OpenAIChatGenerator for now. I personally find the generator implementations a bit cumbersome too. When playing around with haystack, I actually did write an extra ChatModelService abstraction that has an.infer() method to make corrsponding calls to LLMs and cast the response as List[ChatMessage]. This way I can have a single ChatGenerator component that uses a ChatModelService, and I am able to switch between different models without needing to change my pipeline architecture. Not sure if this aligns with what you have in mind, but I hope it helps.

Awesome 💪 Hmm, that's interesting, but how do you switch the models, one needs all these different packages installed say cohere and anthropic and particular peculiar ways they invoke chat completions?

vblagoje avatar Oct 16 '24 12:10 vblagoje

@LastRemote As we are going to deprecate all generators in the near future let's focus on supporting this feature in OpenAIChatGenerator only. 🙏

Sure thing, we can focus on OpenAIChatGenerator for now. I personally find the generator implementations a bit cumbersome too. When playing around with haystack, I actually did write an extra ChatModelService abstraction that has an.infer() method to make corrsponding calls to LLMs and cast the response as List[ChatMessage]. This way I can have a single ChatGenerator component that uses a ChatModelService, and I am able to switch between different models without needing to change my pipeline architecture. Not sure if this aligns with what you have in mind, but I hope it helps.

Awesome 💪 Hmm, that's interesting, but how do you switch the models, one needs all these different packages installed say cohere and anthropic and particular peculiar ways they invoke chat completions?

So I extend the abstract ChatModelService class to support different invocation methods. I have implemented OpenAIChatModelService, BedrockAnthropicChatModelService, GeminiChatModelService and AnthropicChatModelService (I never really used the latter two in practice but just for the sake of completeness). Each type of model service will have its own implementation of .infer() method, which may be using another sdk or simply httpx client. And I will include how to build a request and parse a response inside the infer() method as well, handling all the variations among different calling formats, so that I can call them in the same way inside the pipeline component.

This is the actual code I have for ChatGenerator:

@component
class ChatGenerator:
    """
    A class used to generate chat responses from a given chat model service.

    :param model_service: An instance of the ChatModelService to interact with the model.
    :param model: The name of the model to use for generating responses, if provided.
    :param stream: Whether to use streaming when calling the chat model service, defaults to False
    :param streaming_callbacks: A list of callback functions to handle streaming responses, defaults to None
    :param timeout: The timeout duration for the HTTP client, defaults to 20.0
    :param generation_kwargs: Additional keyword arguments to pass to the model for generation, defaults to None
    """

    def __init__(
        self,
        model_service: ChatModelService,
        model: Optional[str] = None,
        stream: bool = False,
        streaming_callbacks: Optional[List[Callable[[StreamingChunk], None]]] = None,
        timeout: Optional[float] = 20.0,
        generation_kwargs: Optional[Dict[str, Any]] = None,
    ):
        self.model_service = model_service
        self.model = model or ""
        self.stream = stream
        self.streaming_callbacks = streaming_callbacks
        self.generation_kwargs = generation_kwargs or {}
        self.timeout = timeout

    def to_dict(self) -> Dict[str, Any]:
        """
        Returns a serialized dictionary representation of the component.

        :return: A dictionary representation of the component.
        """
        # Create a local asset reference for the model service
        model_service_reference = self.model_service.create_local_reference()
        # Serialize the streaming callbacks
        callback_names = (
            [serialize_callable(callback) for callback in self.streaming_callbacks]
            if self.streaming_callbacks
            else None
        )
        return default_to_dict(
            self,
            model_service=model_service_reference.to_dict(),
            model=self.model,
            streaming_callbacks=callback_names,
            timeout=self.timeout,
            generation_kwargs=self.generation_kwargs,
        )

    @classmethod
    def from_dict(cls, data: Dict[str, Any]) -> Self:
        """
        Loads a ChatGenerator component from its serialized dictionary representation.

        :param data: The serialized dictionary representation of the component.
        :return: The deserialized ChatGenerator component.
        """
        # Make a shallow copy of init_parameters to avoid modifying the original data
        init_parameters_data = data.get("init_parameters", {}).copy()
        # Load the chat model service from reference
        model_service_reference = AssetReference.from_dict(init_parameters_data["model_service"])
        model_service = ModelServiceFactory.load_from_reference(model_service_reference)
        init_parameters_data["model_service"] = model_service

        # Deserialize the streaming callbacks
        serialized_callbacks = init_parameters_data.get("streaming_callbacks")
        if serialized_callbacks and isinstance(serialized_callbacks, list):
            data["init_parameters"]["streaming_callbacks"] = [
                deserialize_callable(callback) for callback in serialized_callbacks
            ]

        # Load the component
        return default_from_dict(cls, {"type": data["type"], "init_parameters": init_parameters_data})

    @component.output_types(replies=List[ChatMessage])
    def run(
        self,
        messages: List[ChatMessage],
        generation_kwargs: Optional[Dict[str, Any]] = None,
        **kwargs,
    ):
        """
        Generates responses for the given list of messages.

        :param messages: The list of messages to generate responses for.
        :param generation_kwargs: Additional keyword arguments to pass to the model for generation, defaults to None
        :param kwargs: Additional keyword arguments to pass to the httpx client.
        :return: A dictionary containing the generated responses.
        """
        # update generation kwargs by merging with the generation kwargs passed to the run method
        generation_kwargs = {**self.generation_kwargs, **(generation_kwargs or {})}

        completions = self.model_service.infer(
            messages,
            model=self.model,
            timeout=self.timeout,
            stream=self.stream,
            streaming_callbacks=self.streaming_callbacks,
            inference_kwargs=generation_kwargs,
            **kwargs,
        )

        return {"replies": completions}

LastRemote avatar Oct 17 '24 04:10 LastRemote

Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :)

LastRemote avatar Oct 22 '24 06:10 LastRemote

Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :)

Hey @LastRemote in general your approach with generic ChatGenerator seems nice, there are many peculiarities with each chat generator in terms of parameters and how streaming is handled. We used this approach in Haystack 1.x for PromptNode and found that it turned into a maintenance nightmare. But YMMV and in your use case it might be a perfect solution.

Regarding the PR - I was awaiting chat generator code, maybe we didn't understand each other?

vblagoje avatar Oct 22 '24 07:10 vblagoje

Hey @vblagoje, can I ask for an update on the PR status? Trying to keep things moving while I am still happy to discuss more about the future deprecation of generators :)

Hey @LastRemote in general your approach with generic ChatGenerator seems nice, there are many peculiarities with each chat generator in terms of parameters and how streaming is handled. We used this approach in Haystack 1.x for PromptNode and found that it turned into a maintenance nightmare. But YMMV and in your use case it might be a perfect solution.

Regarding the PR - I was awaiting chat generator code, maybe we didn't understand each other?

Oh, there is some misunderstanding here. I am still not exactly sure what you have mind after deprecating the current generators, and I am just offering my two cents here hoping it could be useful. Nonetheless, regarding the scope of this PR, I'd prefer that we purpose a smaller change in OpenAIChatGenerator for now. The whole ChatGenerator and ModelService implementation is somewhat complicated and I'd suggest that we separate that topic to another thread if you are still interested in looking my code. That could be a big change that needs a lot more consideration.

LastRemote avatar Oct 23 '24 06:10 LastRemote

Nonetheless, regarding the scope of this PR, I'd prefer that we purpose a smaller change in OpenAIChatGenerator for now.

Yes, exactly - we do understand each other - let's implement TTFT in OpenAIChatGenerator 💪

The whole ChatGenerator and ModelService implementation is somewhat complicated and I'd suggest that we separate that topic to another thread if you are still interested in looking my code. That could be a big change that needs a lot more consideration.

Exactly, that's what I meant as well.

vblagoje avatar Oct 23 '24 07:10 vblagoje

Awesome, thanks! Any additional comments for the current code change? I described my intention of the current changes in a previous post. I will create a PR on the langfuse integration part as well in the integration repository.

@vblagoje Hello, I reviewed your approach and still have some concerns regarding the meta structure and potential integration with Langfuse. Specifically, when a chunk is built, the current behavior captures fields like index, model, tool_calls, and stop_reason. However, I believe we should avoid including this information in the meta["usage"] field when merging meta across chunks. Instead, I prefer preserving the usage meta to reflect only the information directly from the LLM response (or leave it empty if no usage data is captured). This would help prevent potential mismatches when passing usage data to third-party tracers or other downstream services.

Additionally, I converted the TTFT timestamp to ISO format. This change is intended to make it easier to log the entire ChatMessage object or include it as part of a service response. It also requires a slightly different update in the Langfuse integration (which I already implemented in my current project):

        ...
        if tags.get("haystack.component.type") in _SUPPORTED_CHAT_GENERATORS:
            replies = span._data.get("haystack.component.output", {}).get("replies")
            if replies:
                meta = replies[0].meta
                completion_start_time = meta.get("completion_start_time")
                if completion_start_time:
                    completion_start_time = datetime.fromisoformat(completion_start_time)
                span._span.update(
                    usage=meta.get("usage") or None,
                    model=meta.get("model"),
                    completion_start_time=completion_start_time,
                )

LastRemote avatar Oct 23 '24 07:10 LastRemote

@LastRemote thanks for your effort and patience! To clarify, let's focus on adding TTFT to OpenAIChatGenerator, not OpenAIGenerator, in this PR to keep it simple and aligned with our current architecture and objectives. Let me know if you have any questions!

vblagoje avatar Oct 23 '24 15:10 vblagoje

@LastRemote thanks for your effort and patience! To clarify, let's focus on adding TTFT to OpenAIChatGenerator, not OpenAIGenerator, in this PR to keep it simple and aligned with our current architecture and objectives. Let me know if you have any questions!

Oh. I did mistakenly update the wrong generator then. Thanks for pointing out, and I will make another update today.

LastRemote avatar Oct 24 '24 03:10 LastRemote

Updated commit and cleaned up git commit history

LastRemote avatar Oct 24 '24 09:10 LastRemote

Hey @LastRemote have you checked if you get TTFT field in the Langfuse traces?

vblagoje avatar Oct 28 '24 13:10 vblagoje

@vblagoje Yes, it is working and I am using it in mmy applications right now. This does require some code changes in the Langfuse tracer part to track the TTFT information and include it in the tracing span. I will create a PR for that today as well.

LastRemote avatar Oct 30 '24 02:10 LastRemote

@vblagoje Here we go: https://github.com/deepset-ai/haystack-core-integrations/pull/1161

Besides the latest merge caused a lint issue, but I am not sure if it is relevant to this PR since it doesn't change any parameters in OpenAI generator.

haystack/components/generators/chat/openai.py:67:4: R0917: Too many positional arguments (9/5) (too-many-positional-arguments)

LastRemote avatar Oct 30 '24 11:10 LastRemote

Thank you @LastRemote - I'll take a look soon and we can move this one forward!

vblagoje avatar Oct 30 '24 12:10 vblagoje

Great contrib @LastRemote - keep them coming 🙏

vblagoje avatar Oct 31 '24 15:10 vblagoje

@LastRemote @vblagoje So currently the OpenAIChatGenerator with stream=True is giving in usage completion_start_time anytime to have directly the number of seconds to have the TTFT?

Another question, is it supposed to work as well with OpenAIChatGenerator without streaming and it is not implemented yet or it will only work with streaming? Thanks

rhajou avatar Jan 28 '25 07:01 rhajou

Yes, you need to use main I believe @rhajou - TTFT across many chat generators should be in the next release (less than a month away). TTFT works with streaming only but now langfuse-haystack (latest release) will show generator latency regardless of streaming being used or not.

vblagoje avatar Jan 28 '25 08:01 vblagoje

Yes, you need to use main I believe @rhajou - TTFT across many chat generators should be in the next release (less than a month away). TTFT works with streaming only but now langfuse-haystack (latest release) will show generator latency regardless of streaming being used or not.

How can we retrieve the TTFT (in seconds) rather than the timestamp? because without the timestamp of when the query is sent, we cannot estimate TTFT

rhajou avatar Jan 28 '25 08:01 rhajou