-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Feat:add support to /responses #5051
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Feat:add support to /responses #5051
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 4 个问题,并给出了一些高层次的反馈:
- 新增的
llm_compress_use_compact_api设置已经贯穿到了MainAgentBuildConfig和内部的 pipeline stage,但压缩逻辑在supports_native_compact()存在时总是优先使用 native compact;建议把这个 flag 继续传递到ContextCompressor和/或 provider,这样用户在需要时可以显式禁用 native compact。 _get_compress_provider上新增的active_provider参数从未被使用;为了避免对其用途造成困惑,要么移除它,要么真正使用它(例如在llm_compress_provider_id未设置时作为回退)。
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new `llm_compress_use_compact_api` setting is plumbed into `MainAgentBuildConfig` and the internal pipeline stage, but the compressor logic always prefers native compact whenever `supports_native_compact()` is present; consider threading this flag through to the `ContextCompressor` and/or provider so users can explicitly disable native compact when desired.
- The new `active_provider` parameter on `_get_compress_provider` is never used; either remove it or use it (for example as a fallback when `llm_compress_provider_id` is not set) to avoid confusion about its purpose.
## Individual Comments
### Comment 1
<location> `astrbot/core/agent/context/compressor.py:249-257` </location>
<code_context>
return messages
- # build payload
+ native_compact_supported = self._supports_native_compact()
+
+ if native_compact_supported:
+ compacted = await self._try_native_compact(
+ system_messages,
+ messages_to_summarize,
+ recent_messages,
+ )
+ if compacted is not None:
+ return compacted
instruction_message = Message(role="user", content=self.instruction_text)
</code_context>
<issue_to_address>
**issue (bug_risk):** 新增的 `llm_compress_use_compact_api` 标志似乎没有在压缩逻辑中真正生效。
该标志已经在线路配置和初始化中串联好了,但在 provider 支持的情况下,`__call__` 始终优先选择 native compact。如果用户应该能够禁用 native compact,那么 `__call__`(或 `_supports_native_compact`)在调用 `_try_native_compact` 之前应该检查 `llm_compress_use_compact_api`;否则这个标志实际上形同虚设。
</issue_to_address>
### Comment 2
<location> `astrbot/core/astr_main_agent.py:816-819` </location>
<code_context>
def _get_compress_provider(
- config: MainAgentBuildConfig, plugin_context: Context
+ config: MainAgentBuildConfig,
+ plugin_context: Context,
+ active_provider: Provider | None,
) -> Provider | None:
- if not config.llm_compress_provider_id:
</code_context>
<issue_to_address>
**nitpick:** `_get_compress_provider` 中的 `active_provider` 参数未被使用。
`_get_compress_provider` 接受 `active_provider` 参数,但从未使用它。要么实现预期的回退/使用逻辑(例如在 `llm_compress_provider_id` 为空时),要么移除这个参数,以保持 API 清晰并避免对其用途产生困惑。
</issue_to_address>
### Comment 3
<location> `tests/test_openai_responses_source.py:28-37` </location>
<code_context>
+ return ProviderOpenAIResponses.__new__(ProviderOpenAIResponses)
+
+
+def test_is_retryable_upstream_error_with_5xx_status():
+ err = _DummyError("server error", status_code=502)
+
+ assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_with_upstream_error_type():
+ err = _DummyError(
+ "bad gateway",
+ status_code=400,
+ body={"error": {"type": "upstream_error"}},
+ )
+
+ assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
+ err = _DummyError(
+ "invalid request",
</code_context>
<issue_to_address>
**suggestion (testing):** 建议为 compact 和 responses 解析中使用的输入/输出转换辅助函数添加往返(round-trip)测试。
具体来说,建议覆盖以下内容:
- 当 compact responses 返回 `output` 而不是 `input` 时,测试 `_response_output_to_input_items` 和 `_output_content_to_input_content`。
- 测试 `_response_input_to_messages` 和 `_messages_to_response_input`,以确保消息(包括函数调用和工具输出)能够正确序列化为 Responses 的 `input`,并能反序列化回 `Message` 对象。
例如,可以添加以下测试:
- 构造一个具有代表性的 `output` 负载(message + function_call + function_call_output),依次通过 `_response_output_to_input_items` 和 `_response_input_to_messages`,并断言得到的 `Message` 对象具有预期的 `role`、`content`、`tool_calls` 和 `tool_call_id`。
- 构造一组 `Message` 字典(包括 assistant tool_calls 和 tool 消息),经 `_messages_to_response_input` 处理后,断言结果结构符合 Responses API 的 schema。
这样可以更有力地保证 compact 和普通 Responses 流程在这些转换上的一致性。
建议的实现如下:
```python
def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
err = _DummyError(
"invalid request",
status_code=400,
body={"error": {"type": "invalid_request_error"}},
)
assert not _provider()._is_retryable_upstream_error(err)
def test_response_output_to_input_items_and_back_to_messages_round_trip():
provider = _provider()
# Representative compact `output` payload: assistant message with a tool call
# followed by the tool's output.
response_output = [
{
"type": "message",
"message": {
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Calling tool now...",
},
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
},
{
"type": "tool_output",
"tool_output": {
"tool_call_id": "call_1",
"output": '{"temperature": 72}',
},
},
]
# When compact Responses return `output`, we should be able to:
# output -> input items -> Message objects
input_items = provider._response_output_to_input_items(response_output)
messages = provider._response_input_to_messages(
{
"input": input_items,
}
)
# Assert we got the expected assistant message and tool message back.
assert len(messages) == 2
assistant_msg = messages[0]
tool_msg = messages[1]
# Assistant message expectations
# Depending on the implementation, messages may be dict-like or objects.
# We assert using attribute access if available, otherwise dict access.
assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
assistant_tool_calls = getattr(
assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
)
assert assistant_role == "assistant"
assert isinstance(assistant_content, list)
assert any(
(c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
for c in assistant_content
)
assert assistant_tool_calls is not None
assert len(assistant_tool_calls) == 1
assistant_tool_call = assistant_tool_calls[0]
assert assistant_tool_call["id"] == "call_1"
assert assistant_tool_call["function"]["name"] == "get_weather"
# Tool message expectations
tool_role = getattr(tool_msg, "role", tool_msg["role"])
tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
tool_content = getattr(tool_msg, "content", tool_msg["content"])
assert tool_role == "tool"
assert tool_call_id == "call_1"
assert isinstance(tool_content, list)
# We only assert that some text mentioning the temperature is present
assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")
def test_output_content_to_input_content_round_trip():
provider = _provider()
# A compact `output` style content list: assistant text + tool call.
output_content = [
{
"type": "output_text",
"text": "Calling tool now...",
},
{
"type": "output_tool_call",
"id": "call_1",
"tool_name": "get_weather",
"arguments": '{"location": "Boston"}',
},
]
# Transform output-style content into input-style content and back to messages.
input_content = provider._output_content_to_input_content(output_content)
# Wrap the content in a minimal input item to feed through the generic helpers.
input_items = [
{
"role": "assistant",
"content": input_content,
}
]
messages = provider._response_input_to_messages({"input": input_items})
assert len(messages) == 1
msg = messages[0]
role = getattr(msg, "role", msg["role"])
content = getattr(msg, "content", msg["content"])
tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))
assert role == "assistant"
assert isinstance(content, list)
# We should have preserved the output text in some form.
assert any("Calling tool now..." in c.get("text", "") for c in content)
# And we should still have a tool call associated with the assistant.
assert tool_calls is not None
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_1"
def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
provider = _provider()
# Representative message sequence:
# - user asks a question
# - assistant issues a tool call
# - tool returns an answer
messages = [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "What's the weather in Boston?",
}
],
},
{
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Let me check the weather for you.",
}
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": [
{
"type": "output_text",
"text": '{"temperature": 72}',
}
],
},
]
response_input = provider._messages_to_response_input(messages)
# The helper should return something compatible with the Responses `input` schema.
assert isinstance(response_input, dict)
assert "input" in response_input
input_items = response_input["input"]
assert isinstance(input_items, list)
assert len(input_items) == len(messages)
# Check that user / assistant / tool roles are preserved in the serialized input.
roles = [item.get("role") for item in input_items]
assert roles == ["user", "assistant", "tool"]
# Assistant tool call should be present in the serialized input.
assistant_item = input_items[1]
assert "tool_calls" in assistant_item
assert len(assistant_item["tool_calls"]) == 1
assert assistant_item["tool_calls"][0]["id"] == "call_1"
# Tool message should carry the tool_call_id through.
tool_item = input_items[2]
assert tool_item.get("tool_call_id") == "call_1"
# Now run the serialized input back through `_response_input_to_messages`
# and assert we get equivalent messages out.
round_trip_messages = provider._response_input_to_messages(response_input)
assert len(round_trip_messages) == len(messages)
for original, rt in zip(messages, round_trip_messages):
rt_role = getattr(rt, "role", rt["role"])
assert rt_role == original["role"]
# Assistant round-trip should retain the tool call id and function name.
rt_assistant = round_trip_messages[1]
rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
assert rt_tool_calls[0]["id"] == "call_1"
assert rt_tool_calls[0]["function"]["name"] == "get_weather"
# Tool round-trip should retain the tool_call_id.
rt_tool = round_trip_messages[2]
rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
assert rt_tool_call_id == "call_1"
def test_build_responses_input_and_instructions_moves_system_messages():
provider = _provider()
provider.custom_headers = {}
```
这些新测试对现有实现有以下假设:
1. `_response_output_to_input_items` 接受一个由 `output` 项组成的列表(每个项都有一个 `"type"` 字段),并返回一个列表,可直接用作 Responses payload 中 `"input"` 的值。
2. `_response_input_to_messages` 接受一个带有 `"input"` 键的字典(符合 Responses schema),并返回一个有序的“类消息”对象序列。这些对象要么:
- 是带有 `"role"`、`"content"`、`"tool_calls"` 和 `"tool_call_id"` 键的字典;要么
- 是暴露 `.role`、`.content`、`.tool_calls` 和 `.tool_call_id` 属性的对象。测试通过优先使用 `getattr(..., default)`,再回退到字典访问的方式来同时兼容这两种情况。
3. `_output_content_to_input_content` 接受 compact 的 `output` 风格 content 列表,并返回一个 `input` 风格的 content 列表,可用于 `"input"` item 内部。
4. `_messages_to_response_input` 接受一组与 provider 在其他测试中已经使用的格式一致的消息字典,并返回一个包含兼容 Responses API 的 `"input"` 列表的字典。
如果你代码库中这些函数的签名与上述假设不一致,你应该调整:
- 测试中 `response_output`、`output_content` 和 `messages` 的结构,使之匹配实际内部消息格式;
- 传入 `_response_output_to_input_items`、`_output_content_to_input_content`、`_response_input_to_messages` 和 `_messages_to_response_input` 的参数(例如,如果 `_response_input_to_messages` 期望的不是带 `"input"` 键的字典,而是直接传入 input 列表)。
你也可以考虑:
- 引入具体的 `Message` 类,并让测试直接断言其属性,而不是使用“字典/属性双访问”的模式——前提是你的实现可以保证特定的类型。
</issue_to_address>
### Comment 4
<location> `astrbot/core/provider/sources/openai_responses_source.py:585` </location>
<code_context>
+ llm_response.usage = self._extract_response_usage(response.usage)
+ return llm_response
+
+ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+ request_payload = dict(payloads)
+ response_input, instructions = self._build_responses_input_and_instructions(
</code_context>
<issue_to_address>
**issue (complexity):** 建议将 `_query` 和 `_query_stream` 中共享的请求构建逻辑抽取到一个专门的辅助函数中,以集中管理行为并避免重复的代码路径。
通过把共享的请求构建逻辑抽取到单一的辅助函数中,你可以显著减少重复代码,并让 `_query` / `_query_stream` 更易于维护。
目前这两段代码几乎完全相同:
```python
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()
```
你可以将其提取为一个保持所有行为不变的专用辅助函数:
```python
def _build_responses_request(
self,
payloads: dict,
tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()
return request_payload, extra_body, extra_headers
```
然后 `_query` 和 `_query_stream` 就会变得简洁得多,并共享单一的事实来源:
```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
completion = await self.client.responses.create(
**request_payload,
stream=False,
extra_body=extra_body,
extra_headers=extra_headers,
)
...
```
```python
async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
response_id: str | None = None
try:
async with self.client.responses.stream(
**request_payload,
extra_body=extra_body,
extra_headers=extra_headers,
) as stream:
...
```
这样既保留了所有原有行为(包括对 `gemini` 的特殊处理、工具转换、`custom_extra_body` 和 header 构建),又移除了在流式与非流式路径中重复的大块复杂逻辑,从而降低了未来两者行为发生分歧的风险。
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进评审质量。
Original comment in English
Hey - I've found 4 issues, and left some high level feedback:
- The new
llm_compress_use_compact_apisetting is plumbed intoMainAgentBuildConfigand the internal pipeline stage, but the compressor logic always prefers native compact wheneversupports_native_compact()is present; consider threading this flag through to theContextCompressorand/or provider so users can explicitly disable native compact when desired. - The new
active_providerparameter on_get_compress_provideris never used; either remove it or use it (for example as a fallback whenllm_compress_provider_idis not set) to avoid confusion about its purpose.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new `llm_compress_use_compact_api` setting is plumbed into `MainAgentBuildConfig` and the internal pipeline stage, but the compressor logic always prefers native compact whenever `supports_native_compact()` is present; consider threading this flag through to the `ContextCompressor` and/or provider so users can explicitly disable native compact when desired.
- The new `active_provider` parameter on `_get_compress_provider` is never used; either remove it or use it (for example as a fallback when `llm_compress_provider_id` is not set) to avoid confusion about its purpose.
## Individual Comments
### Comment 1
<location> `astrbot/core/agent/context/compressor.py:249-257` </location>
<code_context>
return messages
- # build payload
+ native_compact_supported = self._supports_native_compact()
+
+ if native_compact_supported:
+ compacted = await self._try_native_compact(
+ system_messages,
+ messages_to_summarize,
+ recent_messages,
+ )
+ if compacted is not None:
+ return compacted
instruction_message = Message(role="user", content=self.instruction_text)
</code_context>
<issue_to_address>
**issue (bug_risk):** The new `llm_compress_use_compact_api` flag does not appear to be honored in the compressor logic.
The flag is threaded through config and initialization, but `__call__` always prefers native compact whenever the provider supports it. If users should be able to disable native compact, `__call__` (or `_supports_native_compact`) should consult `llm_compress_use_compact_api` before calling `_try_native_compact`; otherwise the flag is effectively a no-op.
</issue_to_address>
### Comment 2
<location> `astrbot/core/astr_main_agent.py:816-819` </location>
<code_context>
def _get_compress_provider(
- config: MainAgentBuildConfig, plugin_context: Context
+ config: MainAgentBuildConfig,
+ plugin_context: Context,
+ active_provider: Provider | None,
) -> Provider | None:
- if not config.llm_compress_provider_id:
</code_context>
<issue_to_address>
**nitpick:** `active_provider` parameter is unused in `_get_compress_provider`.
`_get_compress_provider` accepts `active_provider` but never uses it. Either add the intended fallback/use logic (e.g., when `llm_compress_provider_id` is empty) or remove the parameter to keep the API clear and avoid confusion about its purpose.
</issue_to_address>
### Comment 3
<location> `tests/test_openai_responses_source.py:28-37` </location>
<code_context>
+ return ProviderOpenAIResponses.__new__(ProviderOpenAIResponses)
+
+
+def test_is_retryable_upstream_error_with_5xx_status():
+ err = _DummyError("server error", status_code=502)
+
+ assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_with_upstream_error_type():
+ err = _DummyError(
+ "bad gateway",
+ status_code=400,
+ body={"error": {"type": "upstream_error"}},
+ )
+
+ assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
+ err = _DummyError(
+ "invalid request",
</code_context>
<issue_to_address>
**suggestion (testing):** Consider adding round-trip tests for input/output conversion helpers used by compact and responses parsing.
Specifically, it would be useful to cover:
- `_response_output_to_input_items` and `_output_content_to_input_content` when compact responses return `output` instead of `input`.
- `_response_input_to_messages` and `_messages_to_response_input`, ensuring messages (including function calls and tool outputs) serialize to Responses `input` and deserialize back to `Message` objects correctly.
For example, add tests that:
- Build a representative `output` payload (message + function_call + function_call_output), run it through `_response_output_to_input_items` then `_response_input_to_messages`, and assert the resulting `Message` objects have the expected `role`, `content`, `tool_calls`, and `tool_call_id`.
- Build a list of `Message` dicts (including assistant tool_calls and tool messages), run `_messages_to_response_input`, and assert the resulting structure matches the Responses API schema.
This would give stronger guarantees that both compact and regular Responses flows are consistent across these transformations.
Suggested implementation:
```python
def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
err = _DummyError(
"invalid request",
status_code=400,
body={"error": {"type": "invalid_request_error"}},
)
assert not _provider()._is_retryable_upstream_error(err)
def test_response_output_to_input_items_and_back_to_messages_round_trip():
provider = _provider()
# Representative compact `output` payload: assistant message with a tool call
# followed by the tool's output.
response_output = [
{
"type": "message",
"message": {
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Calling tool now...",
},
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
},
{
"type": "tool_output",
"tool_output": {
"tool_call_id": "call_1",
"output": '{"temperature": 72}',
},
},
]
# When compact Responses return `output`, we should be able to:
# output -> input items -> Message objects
input_items = provider._response_output_to_input_items(response_output)
messages = provider._response_input_to_messages(
{
"input": input_items,
}
)
# Assert we got the expected assistant message and tool message back.
assert len(messages) == 2
assistant_msg = messages[0]
tool_msg = messages[1]
# Assistant message expectations
# Depending on the implementation, messages may be dict-like or objects.
# We assert using attribute access if available, otherwise dict access.
assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
assistant_tool_calls = getattr(
assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
)
assert assistant_role == "assistant"
assert isinstance(assistant_content, list)
assert any(
(c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
for c in assistant_content
)
assert assistant_tool_calls is not None
assert len(assistant_tool_calls) == 1
assistant_tool_call = assistant_tool_calls[0]
assert assistant_tool_call["id"] == "call_1"
assert assistant_tool_call["function"]["name"] == "get_weather"
# Tool message expectations
tool_role = getattr(tool_msg, "role", tool_msg["role"])
tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
tool_content = getattr(tool_msg, "content", tool_msg["content"])
assert tool_role == "tool"
assert tool_call_id == "call_1"
assert isinstance(tool_content, list)
# We only assert that some text mentioning the temperature is present
assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")
def test_output_content_to_input_content_round_trip():
provider = _provider()
# A compact `output` style content list: assistant text + tool call.
output_content = [
{
"type": "output_text",
"text": "Calling tool now...",
},
{
"type": "output_tool_call",
"id": "call_1",
"tool_name": "get_weather",
"arguments": '{"location": "Boston"}',
},
]
# Transform output-style content into input-style content and back to messages.
input_content = provider._output_content_to_input_content(output_content)
# Wrap the content in a minimal input item to feed through the generic helpers.
input_items = [
{
"role": "assistant",
"content": input_content,
}
]
messages = provider._response_input_to_messages({"input": input_items})
assert len(messages) == 1
msg = messages[0]
role = getattr(msg, "role", msg["role"])
content = getattr(msg, "content", msg["content"])
tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))
assert role == "assistant"
assert isinstance(content, list)
# We should have preserved the output text in some form.
assert any("Calling tool now..." in c.get("text", "") for c in content)
# And we should still have a tool call associated with the assistant.
assert tool_calls is not None
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_1"
def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
provider = _provider()
# Representative message sequence:
# - user asks a question
# - assistant issues a tool call
# - tool returns an answer
messages = [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "What's the weather in Boston?",
}
],
},
{
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Let me check the weather for you.",
}
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": [
{
"type": "output_text",
"text": '{"temperature": 72}',
}
],
},
]
response_input = provider._messages_to_response_input(messages)
# The helper should return something compatible with the Responses `input` schema.
assert isinstance(response_input, dict)
assert "input" in response_input
input_items = response_input["input"]
assert isinstance(input_items, list)
assert len(input_items) == len(messages)
# Check that user / assistant / tool roles are preserved in the serialized input.
roles = [item.get("role") for item in input_items]
assert roles == ["user", "assistant", "tool"]
# Assistant tool call should be present in the serialized input.
assistant_item = input_items[1]
assert "tool_calls" in assistant_item
assert len(assistant_item["tool_calls"]) == 1
assert assistant_item["tool_calls"][0]["id"] == "call_1"
# Tool message should carry the tool_call_id through.
tool_item = input_items[2]
assert tool_item.get("tool_call_id") == "call_1"
# Now run the serialized input back through `_response_input_to_messages`
# and assert we get equivalent messages out.
round_trip_messages = provider._response_input_to_messages(response_input)
assert len(round_trip_messages) == len(messages)
for original, rt in zip(messages, round_trip_messages):
rt_role = getattr(rt, "role", rt["role"])
assert rt_role == original["role"]
# Assistant round-trip should retain the tool call id and function name.
rt_assistant = round_trip_messages[1]
rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
assert rt_tool_calls[0]["id"] == "call_1"
assert rt_tool_calls[0]["function"]["name"] == "get_weather"
# Tool round-trip should retain the tool_call_id.
rt_tool = round_trip_messages[2]
rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
assert rt_tool_call_id == "call_1"
def test_build_responses_input_and_instructions_moves_system_messages():
provider = _provider()
provider.custom_headers = {}
```
The new tests assume the following about the existing implementation:
1. `_response_output_to_input_items` accepts a list of `output` items (each with a `"type"` field) and returns a list suitable for being used as the `"input"` value in the Responses payload.
2. `_response_input_to_messages` accepts a dict with an `"input"` key (matching the Responses schema) and returns an ordered sequence of message-like objects. Those objects either:
- are dicts with keys like `"role"`, `"content"`, `"tool_calls"`, and `"tool_call_id"`, or
- are objects exposing `.role`, `.content`, `.tool_calls`, and `.tool_call_id` attributes. The tests handle both by using `getattr(..., default)` first and falling back to dict access.
3. `_output_content_to_input_content` accepts a compact `output`-style content list and returns an `input`-style content list suitable for use inside an `"input"` item.
4. `_messages_to_response_input` accepts a list of message dicts matching what the provider already consumes elsewhere in tests and returns a dict containing an `"input"` list compatible with the Responses API.
If any of these signatures differ in your codebase, you should adjust:
- The shape of `response_output`, `output_content`, and `messages` in the tests to match your actual internal message format.
- The arguments passed into `_response_output_to_input_items`, `_output_content_to_input_content`, `_response_input_to_messages`, and `_messages_to_response_input` (for example, if `_response_input_to_messages` expects just the list of input items rather than a dict with `"input"`).
You may also want to:
- Import any concrete `Message` class and make the tests assert on its attributes directly instead of using the dict/attribute dual-access pattern, if your implementation guarantees a specific type.
</issue_to_address>
### Comment 4
<location> `astrbot/core/provider/sources/openai_responses_source.py:585` </location>
<code_context>
+ llm_response.usage = self._extract_response_usage(response.usage)
+ return llm_response
+
+ async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+ request_payload = dict(payloads)
+ response_input, instructions = self._build_responses_input_and_instructions(
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting the shared request-building logic from `_query` and `_query_stream` into a dedicated helper to centralize behavior and avoid duplicated code paths.
You can significantly reduce duplication and make `_query` / `_query_stream` easier to maintain by factoring out the shared request construction into a single helper.
Right now these blocks are almost identical:
```python
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()
```
You can extract this into a focused helper that keeps all behavior intact:
```python
def _build_responses_request(
self,
payloads: dict,
tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()
return request_payload, extra_body, extra_headers
```
Then `_query` and `_query_stream` become much shorter and share a single source of truth:
```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
completion = await self.client.responses.create(
**request_payload,
stream=False,
extra_body=extra_body,
extra_headers=extra_headers,
)
...
```
```python
async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
response_id: str | None = None
try:
async with self.client.responses.stream(
**request_payload,
extra_body=extra_body,
extra_headers=extra_headers,
) as stream:
...
```
This keeps all behavior (including `gemini` handling, tool conversion, `custom_extra_body`, and header building) but removes a large duplicated block of non-trivial logic, reducing the chance of future divergence between streaming and non-streaming paths.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| native_compact_supported = self._supports_native_compact() | ||
|
|
||
| if native_compact_supported: | ||
| compacted = await self._try_native_compact( | ||
| system_messages, | ||
| messages_to_summarize, | ||
| recent_messages, | ||
| ) | ||
| if compacted is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): 新增的 llm_compress_use_compact_api 标志似乎没有在压缩逻辑中真正生效。
该标志已经在线路配置和初始化中串联好了,但在 provider 支持的情况下,__call__ 始终优先选择 native compact。如果用户应该能够禁用 native compact,那么 __call__(或 _supports_native_compact)在调用 _try_native_compact 之前应该检查 llm_compress_use_compact_api;否则这个标志实际上形同虚设。
Original comment in English
issue (bug_risk): The new llm_compress_use_compact_api flag does not appear to be honored in the compressor logic.
The flag is threaded through config and initialization, but __call__ always prefers native compact whenever the provider supports it. If users should be able to disable native compact, __call__ (or _supports_native_compact) should consult llm_compress_use_compact_api before calling _try_native_compact; otherwise the flag is effectively a no-op.
| def _get_compress_provider( | ||
| config: MainAgentBuildConfig, plugin_context: Context | ||
| config: MainAgentBuildConfig, | ||
| plugin_context: Context, | ||
| active_provider: Provider | None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nitpick: _get_compress_provider 中的 active_provider 参数未被使用。
_get_compress_provider 接受 active_provider 参数,但从未使用它。要么实现预期的回退/使用逻辑(例如在 llm_compress_provider_id 为空时),要么移除这个参数,以保持 API 清晰并避免对其用途产生困惑。
Original comment in English
nitpick: active_provider parameter is unused in _get_compress_provider.
_get_compress_provider accepts active_provider but never uses it. Either add the intended fallback/use logic (e.g., when llm_compress_provider_id is empty) or remove the parameter to keep the API clear and avoid confusion about its purpose.
| def test_is_retryable_upstream_error_with_5xx_status(): | ||
| err = _DummyError("server error", status_code=502) | ||
|
|
||
| assert _provider()._is_retryable_upstream_error(err) | ||
|
|
||
|
|
||
| def test_is_retryable_upstream_error_with_upstream_error_type(): | ||
| err = _DummyError( | ||
| "bad gateway", | ||
| status_code=400, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (testing): 建议为 compact 和 responses 解析中使用的输入/输出转换辅助函数添加往返(round-trip)测试。
具体来说,建议覆盖以下内容:
- 当 compact responses 返回
output而不是input时,测试_response_output_to_input_items和_output_content_to_input_content。 - 测试
_response_input_to_messages和_messages_to_response_input,以确保消息(包括函数调用和工具输出)能够正确序列化为 Responses 的input,并能反序列化回Message对象。
例如,可以添加以下测试:
- 构造一个具有代表性的
output负载(message + function_call + function_call_output),依次通过_response_output_to_input_items和_response_input_to_messages,并断言得到的Message对象具有预期的role、content、tool_calls和tool_call_id。 - 构造一组
Message字典(包括 assistant tool_calls 和 tool 消息),经_messages_to_response_input处理后,断言结果结构符合 Responses API 的 schema。
这样可以更有力地保证 compact 和普通 Responses 流程在这些转换上的一致性。
建议的实现如下:
def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
err = _DummyError(
"invalid request",
status_code=400,
body={"error": {"type": "invalid_request_error"}},
)
assert not _provider()._is_retryable_upstream_error(err)
def test_response_output_to_input_items_and_back_to_messages_round_trip():
provider = _provider()
# Representative compact `output` payload: assistant message with a tool call
# followed by the tool's output.
response_output = [
{
"type": "message",
"message": {
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Calling tool now...",
},
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
},
{
"type": "tool_output",
"tool_output": {
"tool_call_id": "call_1",
"output": '{"temperature": 72}',
},
},
]
# When compact Responses return `output`, we should be able to:
# output -> input items -> Message objects
input_items = provider._response_output_to_input_items(response_output)
messages = provider._response_input_to_messages(
{
"input": input_items,
}
)
# Assert we got the expected assistant message and tool message back.
assert len(messages) == 2
assistant_msg = messages[0]
tool_msg = messages[1]
# Assistant message expectations
# Depending on the implementation, messages may be dict-like or objects.
# We assert using attribute access if available, otherwise dict access.
assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
assistant_tool_calls = getattr(
assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
)
assert assistant_role == "assistant"
assert isinstance(assistant_content, list)
assert any(
(c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
for c in assistant_content
)
assert assistant_tool_calls is not None
assert len(assistant_tool_calls) == 1
assistant_tool_call = assistant_tool_calls[0]
assert assistant_tool_call["id"] == "call_1"
assert assistant_tool_call["function"]["name"] == "get_weather"
# Tool message expectations
tool_role = getattr(tool_msg, "role", tool_msg["role"])
tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
tool_content = getattr(tool_msg, "content", tool_msg["content"])
assert tool_role == "tool"
assert tool_call_id == "call_1"
assert isinstance(tool_content, list)
# We only assert that some text mentioning the temperature is present
assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")
def test_output_content_to_input_content_round_trip():
provider = _provider()
# A compact `output` style content list: assistant text + tool call.
output_content = [
{
"type": "output_text",
"text": "Calling tool now...",
},
{
"type": "output_tool_call",
"id": "call_1",
"tool_name": "get_weather",
"arguments": '{"location": "Boston"}',
},
]
# Transform output-style content into input-style content and back to messages.
input_content = provider._output_content_to_input_content(output_content)
# Wrap the content in a minimal input item to feed through the generic helpers.
input_items = [
{
"role": "assistant",
"content": input_content,
}
]
messages = provider._response_input_to_messages({"input": input_items})
assert len(messages) == 1
msg = messages[0]
role = getattr(msg, "role", msg["role"])
content = getattr(msg, "content", msg["content"])
tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))
assert role == "assistant"
assert isinstance(content, list)
# We should have preserved the output text in some form.
assert any("Calling tool now..." in c.get("text", "") for c in content)
# And we should still have a tool call associated with the assistant.
assert tool_calls is not None
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_1"
def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
provider = _provider()
# Representative message sequence:
# - user asks a question
# - assistant issues a tool call
# - tool returns an answer
messages = [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "What's the weather in Boston?",
}
],
},
{
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Let me check the weather for you.",
}
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": [
{
"type": "output_text",
"text": '{"temperature": 72}',
}
],
},
]
response_input = provider._messages_to_response_input(messages)
# The helper should return something compatible with the Responses `input` schema.
assert isinstance(response_input, dict)
assert "input" in response_input
input_items = response_input["input"]
assert isinstance(input_items, list)
assert len(input_items) == len(messages)
# Check that user / assistant / tool roles are preserved in the serialized input.
roles = [item.get("role") for item in input_items]
assert roles == ["user", "assistant", "tool"]
# Assistant tool call should be present in the serialized input.
assistant_item = input_items[1]
assert "tool_calls" in assistant_item
assert len(assistant_item["tool_calls"]) == 1
assert assistant_item["tool_calls"][0]["id"] == "call_1"
# Tool message should carry the tool_call_id through.
tool_item = input_items[2]
assert tool_item.get("tool_call_id") == "call_1"
# Now run the serialized input back through `_response_input_to_messages`
# and assert we get equivalent messages out.
round_trip_messages = provider._response_input_to_messages(response_input)
assert len(round_trip_messages) == len(messages)
for original, rt in zip(messages, round_trip_messages):
rt_role = getattr(rt, "role", rt["role"])
assert rt_role == original["role"]
# Assistant round-trip should retain the tool call id and function name.
rt_assistant = round_trip_messages[1]
rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
assert rt_tool_calls[0]["id"] == "call_1"
assert rt_tool_calls[0]["function"]["name"] == "get_weather"
# Tool round-trip should retain the tool_call_id.
rt_tool = round_trip_messages[2]
rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
assert rt_tool_call_id == "call_1"
def test_build_responses_input_and_instructions_moves_system_messages():
provider = _provider()
provider.custom_headers = {}这些新测试对现有实现有以下假设:
_response_output_to_input_items接受一个由output项组成的列表(每个项都有一个"type"字段),并返回一个列表,可直接用作 Responses payload 中"input"的值。_response_input_to_messages接受一个带有"input"键的字典(符合 Responses schema),并返回一个有序的“类消息”对象序列。这些对象要么:- 是带有
"role"、"content"、"tool_calls"和"tool_call_id"键的字典;要么 - 是暴露
.role、.content、.tool_calls和.tool_call_id属性的对象。测试通过优先使用getattr(..., default),再回退到字典访问的方式来同时兼容这两种情况。
- 是带有
_output_content_to_input_content接受 compact 的output风格 content 列表,并返回一个input风格的 content 列表,可用于"input"item 内部。_messages_to_response_input接受一组与 provider 在其他测试中已经使用的格式一致的消息字典,并返回一个包含兼容 Responses API 的"input"列表的字典。
如果你代码库中这些函数的签名与上述假设不一致,你应该调整:
- 测试中
response_output、output_content和messages的结构,使之匹配实际内部消息格式; - 传入
_response_output_to_input_items、_output_content_to_input_content、_response_input_to_messages和_messages_to_response_input的参数(例如,如果_response_input_to_messages期望的不是带"input"键的字典,而是直接传入 input 列表)。
你也可以考虑:
- 引入具体的
Message类,并让测试直接断言其属性,而不是使用“字典/属性双访问”的模式——前提是你的实现可以保证特定的类型。
Original comment in English
suggestion (testing): Consider adding round-trip tests for input/output conversion helpers used by compact and responses parsing.
Specifically, it would be useful to cover:
_response_output_to_input_itemsand_output_content_to_input_contentwhen compact responses returnoutputinstead ofinput._response_input_to_messagesand_messages_to_response_input, ensuring messages (including function calls and tool outputs) serialize to Responsesinputand deserialize back toMessageobjects correctly.
For example, add tests that:
- Build a representative
outputpayload (message + function_call + function_call_output), run it through_response_output_to_input_itemsthen_response_input_to_messages, and assert the resultingMessageobjects have the expectedrole,content,tool_calls, andtool_call_id. - Build a list of
Messagedicts (including assistant tool_calls and tool messages), run_messages_to_response_input, and assert the resulting structure matches the Responses API schema.
This would give stronger guarantees that both compact and regular Responses flows are consistent across these transformations.
Suggested implementation:
def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
err = _DummyError(
"invalid request",
status_code=400,
body={"error": {"type": "invalid_request_error"}},
)
assert not _provider()._is_retryable_upstream_error(err)
def test_response_output_to_input_items_and_back_to_messages_round_trip():
provider = _provider()
# Representative compact `output` payload: assistant message with a tool call
# followed by the tool's output.
response_output = [
{
"type": "message",
"message": {
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Calling tool now...",
},
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
},
{
"type": "tool_output",
"tool_output": {
"tool_call_id": "call_1",
"output": '{"temperature": 72}',
},
},
]
# When compact Responses return `output`, we should be able to:
# output -> input items -> Message objects
input_items = provider._response_output_to_input_items(response_output)
messages = provider._response_input_to_messages(
{
"input": input_items,
}
)
# Assert we got the expected assistant message and tool message back.
assert len(messages) == 2
assistant_msg = messages[0]
tool_msg = messages[1]
# Assistant message expectations
# Depending on the implementation, messages may be dict-like or objects.
# We assert using attribute access if available, otherwise dict access.
assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
assistant_tool_calls = getattr(
assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
)
assert assistant_role == "assistant"
assert isinstance(assistant_content, list)
assert any(
(c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
for c in assistant_content
)
assert assistant_tool_calls is not None
assert len(assistant_tool_calls) == 1
assistant_tool_call = assistant_tool_calls[0]
assert assistant_tool_call["id"] == "call_1"
assert assistant_tool_call["function"]["name"] == "get_weather"
# Tool message expectations
tool_role = getattr(tool_msg, "role", tool_msg["role"])
tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
tool_content = getattr(tool_msg, "content", tool_msg["content"])
assert tool_role == "tool"
assert tool_call_id == "call_1"
assert isinstance(tool_content, list)
# We only assert that some text mentioning the temperature is present
assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")
def test_output_content_to_input_content_round_trip():
provider = _provider()
# A compact `output` style content list: assistant text + tool call.
output_content = [
{
"type": "output_text",
"text": "Calling tool now...",
},
{
"type": "output_tool_call",
"id": "call_1",
"tool_name": "get_weather",
"arguments": '{"location": "Boston"}',
},
]
# Transform output-style content into input-style content and back to messages.
input_content = provider._output_content_to_input_content(output_content)
# Wrap the content in a minimal input item to feed through the generic helpers.
input_items = [
{
"role": "assistant",
"content": input_content,
}
]
messages = provider._response_input_to_messages({"input": input_items})
assert len(messages) == 1
msg = messages[0]
role = getattr(msg, "role", msg["role"])
content = getattr(msg, "content", msg["content"])
tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))
assert role == "assistant"
assert isinstance(content, list)
# We should have preserved the output text in some form.
assert any("Calling tool now..." in c.get("text", "") for c in content)
# And we should still have a tool call associated with the assistant.
assert tool_calls is not None
assert len(tool_calls) == 1
assert tool_calls[0]["id"] == "call_1"
def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
provider = _provider()
# Representative message sequence:
# - user asks a question
# - assistant issues a tool call
# - tool returns an answer
messages = [
{
"role": "user",
"content": [
{
"type": "input_text",
"text": "What's the weather in Boston?",
}
],
},
{
"role": "assistant",
"content": [
{
"type": "output_text",
"text": "Let me check the weather for you.",
}
],
"tool_calls": [
{
"id": "call_1",
"type": "function",
"function": {
"name": "get_weather",
"arguments": '{"location": "Boston"}',
},
}
],
},
{
"role": "tool",
"tool_call_id": "call_1",
"content": [
{
"type": "output_text",
"text": '{"temperature": 72}',
}
],
},
]
response_input = provider._messages_to_response_input(messages)
# The helper should return something compatible with the Responses `input` schema.
assert isinstance(response_input, dict)
assert "input" in response_input
input_items = response_input["input"]
assert isinstance(input_items, list)
assert len(input_items) == len(messages)
# Check that user / assistant / tool roles are preserved in the serialized input.
roles = [item.get("role") for item in input_items]
assert roles == ["user", "assistant", "tool"]
# Assistant tool call should be present in the serialized input.
assistant_item = input_items[1]
assert "tool_calls" in assistant_item
assert len(assistant_item["tool_calls"]) == 1
assert assistant_item["tool_calls"][0]["id"] == "call_1"
# Tool message should carry the tool_call_id through.
tool_item = input_items[2]
assert tool_item.get("tool_call_id") == "call_1"
# Now run the serialized input back through `_response_input_to_messages`
# and assert we get equivalent messages out.
round_trip_messages = provider._response_input_to_messages(response_input)
assert len(round_trip_messages) == len(messages)
for original, rt in zip(messages, round_trip_messages):
rt_role = getattr(rt, "role", rt["role"])
assert rt_role == original["role"]
# Assistant round-trip should retain the tool call id and function name.
rt_assistant = round_trip_messages[1]
rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
assert rt_tool_calls[0]["id"] == "call_1"
assert rt_tool_calls[0]["function"]["name"] == "get_weather"
# Tool round-trip should retain the tool_call_id.
rt_tool = round_trip_messages[2]
rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
assert rt_tool_call_id == "call_1"
def test_build_responses_input_and_instructions_moves_system_messages():
provider = _provider()
provider.custom_headers = {}The new tests assume the following about the existing implementation:
_response_output_to_input_itemsaccepts a list ofoutputitems (each with a"type"field) and returns a list suitable for being used as the"input"value in the Responses payload._response_input_to_messagesaccepts a dict with an"input"key (matching the Responses schema) and returns an ordered sequence of message-like objects. Those objects either:- are dicts with keys like
"role","content","tool_calls", and"tool_call_id", or - are objects exposing
.role,.content,.tool_calls, and.tool_call_idattributes. The tests handle both by usinggetattr(..., default)first and falling back to dict access.
- are dicts with keys like
_output_content_to_input_contentaccepts a compactoutput-style content list and returns aninput-style content list suitable for use inside an"input"item._messages_to_response_inputaccepts a list of message dicts matching what the provider already consumes elsewhere in tests and returns a dict containing an"input"list compatible with the Responses API.
If any of these signatures differ in your codebase, you should adjust:
- The shape of
response_output,output_content, andmessagesin the tests to match your actual internal message format. - The arguments passed into
_response_output_to_input_items,_output_content_to_input_content,_response_input_to_messages, and_messages_to_response_input(for example, if_response_input_to_messagesexpects just the list of input items rather than a dict with"input").
You may also want to:
- Import any concrete
Messageclass and make the tests assert on its attributes directly instead of using the dict/attribute dual-access pattern, if your implementation guarantees a specific type.
| llm_response.usage = self._extract_response_usage(response.usage) | ||
| return llm_response | ||
|
|
||
| async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议将 _query 和 _query_stream 中共享的请求构建逻辑抽取到一个专门的辅助函数中,以集中管理行为并避免重复的代码路径。
通过把共享的请求构建逻辑抽取到单一的辅助函数中,你可以显著减少重复代码,并让 _query / _query_stream 更易于维护。
目前这两段代码几乎完全相同:
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()你可以将其提取为一个保持所有行为不变的专用辅助函数:
def _build_responses_request(
self,
payloads: dict,
tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()
return request_payload, extra_body, extra_headers然后 _query 和 _query_stream 就会变得简洁得多,并共享单一的事实来源:
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
completion = await self.client.responses.create(
**request_payload,
stream=False,
extra_body=extra_body,
extra_headers=extra_headers,
)
...async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
response_id: str | None = None
try:
async with self.client.responses.stream(
**request_payload,
extra_body=extra_body,
extra_headers=extra_headers,
) as stream:
...这样既保留了所有原有行为(包括对 gemini 的特殊处理、工具转换、custom_extra_body 和 header 构建),又移除了在流式与非流式路径中重复的大块复杂逻辑,从而降低了未来两者行为发生分歧的风险。
Original comment in English
issue (complexity): Consider extracting the shared request-building logic from _query and _query_stream into a dedicated helper to centralize behavior and avoid duplicated code paths.
You can significantly reduce duplication and make _query / _query_stream easier to maintain by factoring out the shared request construction into a single helper.
Right now these blocks are almost identical:
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()You can extract this into a focused helper that keeps all behavior intact:
def _build_responses_request(
self,
payloads: dict,
tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
request_payload["instructions"] = instructions
if tools:
model = request_payload.get("model", "").lower()
omit_empty_param_field = "gemini" in model
tool_list = tools.get_func_desc_openai_style(
omit_empty_parameter_field=omit_empty_param_field,
)
response_tools = self._convert_tools_to_responses(tool_list)
if response_tools:
request_payload["tools"] = response_tools
extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
if key not in self.default_params:
extra_body[key] = request_payload.pop(key)
custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
extra_body.update(custom_extra_body)
extra_headers = self._build_extra_headers()
return request_payload, extra_body, extra_headersThen _query and _query_stream become much shorter and share a single source of truth:
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
completion = await self.client.responses.create(
**request_payload,
stream=False,
extra_body=extra_body,
extra_headers=extra_headers,
)
...async def _query_stream(
self,
payloads: dict,
tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
request_payload, extra_body, extra_headers = self._build_responses_request(
payloads, tools
)
response_id: str | None = None
try:
async with self.client.responses.stream(
**request_payload,
extra_body=extra_body,
extra_headers=extra_headers,
) as stream:
...This keeps all behavior (including gemini handling, tool conversion, custom_extra_body, and header building) but removes a large duplicated block of non-trivial logic, reducing the chance of future divergence between streaming and non-streaming paths.
Modifications / 改动点
接入 OpenAI 的 /responses API,旨在提供更统一的对话、推理(Reasoning)和工具调用处理能力。同时利用其原生的 compact 接口实现更高效的上下文压缩,减少长对话下的 Token 开销。
Screenshots or Test Results / 运行截图或测试结果
Checklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations inrequirements.txtandpyproject.toml.Summary by Sourcery
将 OpenAI Responses API 集成为一等提供方,并增强上下文压缩和工具调用流程,以利用其原生紧凑能力,同时提高流式传输与标题生成的鲁棒性。
New Features:
/responsesAPI。Enhancements:
tool_call和tool_call_result事件在进行中的对话中被正确传播和渲染。LLMResponse实体,以识别 OpenAI Responses 作为受支持的补全来源。Tests:
Original summary in English
Summary by Sourcery
Integrate OpenAI Responses API as a first-class provider and enhance context compression and tool-calling flows to leverage native compact capabilities while improving robustness of streaming and title generation.
New Features:
Enhancements:
Tests: