open-assistant-api icon indicating copy to clipboard operation
open-assistant-api copied to clipboard

调用自定义函数,流式返回有问题

Open Panweitong opened this issue 1 year ago • 1 comments

openai SDK版本:1.27.0

使用openai 官方的key,代码可以正常执行,但是如果使用open assistant api的key和base_url,代码执行错,报错截图如下: image

相关代码如下: `import time import json

assistant_api_key = "" assistant_base_url=""

assistantApiClient=OpenaI( api_key=assistant_api_key, # 如果您没有配置环境变量,请在此处用您的API Key进行替换 base_url=assistant_base_url, )

def get_current_weather(location): return f"{location}今天是雨天。 "

if name == "main":

assistant = assistantApiClient.beta.assistants.create(
    name="Assistant Demo",
    instructions="You are a helpful assistant. When asked a question, use tools wherever possible.",
    model="gpt-4o",
    tools=[
        {
            "type": "function",
            "function": {
                "name": "get_current_weather",
                "description": "当你想查询指定城市的天气时非常有用。",
                "parameters": {  # 查询天气时需要提供位置,因此参数设置为location
                    "type": "object",
                    "properties": {
                        "location": {
                            "type": "string",
                            "description": "城市或县区,比如北京市、杭州市、余杭区等。"
                        }
                    },
                    "required": ["location"]
                },
            }
        }
    ]
)
print("=====> : %s\n", assistant)

thread = assistantApiClient.beta.threads.create()
print("=====> : %s\n", thread)

message = assistantApiClient.beta.threads.messages.create(
    thread_id=thread.id,
    role="user",
    content="北京天气如何?",
)
print("=====> : %s\n", message)

funcs = [get_current_weather]

class EventHandler(AssistantEventHandler):

    @override
    def on_event(self, event):
        if event.event == 'thread.run.requires_action':
            print(event)
            run_id = event.data.id  # Retrieve the run ID from the event data
            self.handle_requires_action(event.data, run_id)


    def handle_requires_action(self, data, run_id):
        tool_outputs = []
            
        for tool in data.required_action.submit_tool_outputs.tool_calls:
            func = next(
                iter([func for func in funcs if func.__name__ == tool.function.name]))
            try:
                output = func(**eval(tool.function.arguments))
            except Exception as e:
                output = "Error: " + str(e)

            tool_outputs.append(
                {
                    "tool_call_id": tool.id,
                    "output": json.dumps(output)
                }
            )
        
        print(tool_outputs)
            
        # Submit all tool_outputs at the same time
        self.submit_tool_outputs(tool_outputs, run_id)

    def submit_tool_outputs(self, tool_outputs, run_id):
        # Use the submit_tool_outputs_stream helper
        with assistantApiClient.beta.threads.runs.submit_tool_outputs_stream(
            thread_id=self.current_run.thread_id,
            run_id=self.current_run.id,
            tool_outputs=tool_outputs,
            event_handler=EventHandler(),
        ) as stream:
            # for text in stream.text_deltas:
            #     print(text, end="", flush=True)
            #     print()
            stream.until_done()
    
    @override
    def on_text_delta(self, delta, snapshot) -> None:
        print("=====> text delta")
        print("delta   : %s", delta)

with assistantApiClient.beta.threads.runs.stream(
    thread_id=thread.id,
    assistant_id=assistant.id,
    event_handler=EventHandler(),
) as stream:
    stream.until_done()`

Panweitong avatar Aug 29 '24 06:08 Panweitong

image submit_tool_outputs_stream 用这个的时候,会再次获取相同的一个渠道事件,所以会进入两次 if event.event == "thread.run.requires_action": ,第二次导致了这个报错,如果不想报错,可以使用 submit_tool_outputs @Panweitong 之后会对这个地方进行修复

YangZhiBoGreenHand avatar Sep 05 '24 07:09 YangZhiBoGreenHand