Async Chat Streaming fails
Hey Cohere Team 👋 , I'm currently trying to implement async Token Streaming with Cohere (cohere==4.33). However, when running this code, I get some errors:
completion = co.chat(
chat_history=_conversation,
stream=True,
message=message,
model="command",
temperature=0.1,
)
>>> async for chunk in completion: # Breaks here
if isinstance(chunk, StreamTextGeneration):
yield {
"message": chunk.text,
"finish_reason": "",
}
elif isinstance(chunk, StreamEnd):
yield {
"message": "",
"finish_reason": "stop",
}
I get this error message:
TypeError: 'async for' requires an object with __aiter__ method, got bytes
Which comes from the StreamingChat object (code):
async def __aiter__(self) -> Generator[StreamResponse, None, None]:
index = 0
>>> async for line in self.response.content: # self.response.content are bytes
item = self._make_response_item(index, line)
index += 1
if item is not None:
yield item
When looking at the origin of the response, I get the _request method (code) which runs:
with requests.Session() as session:
retries = Retry(
total=self.max_retries,
backoff_factor=0.5,
allowed_methods=["POST", "GET"],
status_forcelist=cohere.RETRY_STATUS_CODES,
raise_on_status=False,
)
session.mount("https://", HTTPAdapter(max_retries=retries))
session.mount("http://", HTTPAdapter(max_retries=retries))
if stream:
return session.request(method, url, headers=headers, json=json, **self.request_dict, stream=True)
Not sure whether my implementation is incorrect, please let me know if my code is incorrect or if you can reproduce the error! Thanks a lot 🚀 Also happy about any directions to examples or documentation.
Edit: It works when I remove the async for loop in my code, but then the method gets synchronous :(
could you past the completion object,_conversation and one iteration of what chunk is?
It seems the problem is a data type problem 'async for' requires an object with __aiter__ method, got bytes