cohere-python icon indicating copy to clipboard operation
cohere-python copied to clipboard

Async Chat Streaming fails

Open thomashacker opened this issue 2 years ago • 1 comments

Hey Cohere Team 👋 , I'm currently trying to implement async Token Streaming with Cohere (cohere==4.33). However, when running this code, I get some errors:

completion = co.chat(
    chat_history=_conversation,
    stream=True,
    message=message,
    model="command",
    temperature=0.1,
)

>>> async for chunk in completion: # Breaks here
    if isinstance(chunk, StreamTextGeneration):
        yield {
            "message": chunk.text,
            "finish_reason": "",
        }
    elif isinstance(chunk, StreamEnd):
        yield {
            "message": "",
            "finish_reason": "stop",
        }

I get this error message:

TypeError: 'async for' requires an object with __aiter__ method, got bytes

Which comes from the StreamingChat object (code):

async def __aiter__(self) -> Generator[StreamResponse, None, None]:
    index = 0
    >>> async for line in self.response.content: # self.response.content are bytes
        item = self._make_response_item(index, line)
        index += 1
        if item is not None:
            yield item

When looking at the origin of the response, I get the _request method (code) which runs:

with requests.Session() as session:
            retries = Retry(
                total=self.max_retries,
                backoff_factor=0.5,
                allowed_methods=["POST", "GET"],
                status_forcelist=cohere.RETRY_STATUS_CODES,
                raise_on_status=False,
            )
            session.mount("https://", HTTPAdapter(max_retries=retries))
            session.mount("http://", HTTPAdapter(max_retries=retries))

            if stream:
                return session.request(method, url, headers=headers, json=json, **self.request_dict, stream=True)

Not sure whether my implementation is incorrect, please let me know if my code is incorrect or if you can reproduce the error! Thanks a lot 🚀 Also happy about any directions to examples or documentation.

Edit: It works when I remove the async for loop in my code, but then the method gets synchronous :(

thomashacker avatar Nov 09 '23 21:11 thomashacker

could you past the completion object,_conversation and one iteration of what chunk is?

It seems the problem is a data type problem 'async for' requires an object with __aiter__ method, got bytes

murdadesmaeeli avatar Nov 20 '23 19:11 murdadesmaeeli