Skip to content

Conversation

@xunxiing
Copy link
Contributor

@xunxiing xunxiing commented Feb 11, 2026

Modifications / 改动点

接入 OpenAI 的 /responses API,旨在提供更统一的对话、推理(Reasoning)和工具调用处理能力。同时利用其原生的 compact 接口实现更高效的上下文压缩,减少长对话下的 Token 开销。

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

image

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。/ If there are new features added in the PR, I have discussed it with the authors through issues/emails, etc.
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”。/ My changes have been well-tested, and "Verification Steps" and "Screenshots" have been provided above.
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。/ I have ensured that no new dependencies are introduced, OR if new dependencies are introduced, they have been added to the appropriate locations in requirements.txt and pyproject.toml.
  • 😮 我的更改没有引入恶意代码。/ My changes do not introduce malicious code.

Summary by Sourcery

将 OpenAI Responses API 集成为一等提供方,并增强上下文压缩和工具调用流程,以利用其原生紧凑能力,同时提高流式传输与标题生成的鲁棒性。

New Features:

  • 添加 OpenAI Responses 提供方适配器,支持用于聊天、流式传输、工具以及原生紧凑上下文处理的 /responses API。
  • 引入配置开关及默认设置,在可用时优先使用提供方的原生紧凑 API 来进行基于 LLM 的上下文压缩。
  • 在默认配置中注册一个 OpenAI Responses 聊天提供方模板,便于快速配置。

Enhancements:

  • 更新上下文压缩器,以检测并优先使用原生紧凑 API;如果紧凑处理失败,则优雅地回退到基于 LLM 摘要的压缩方式。
  • 改进工具调用结果的流式传输和仪表板消息处理,确保 tool_calltool_call_result 事件在进行中的对话中被正确传播和渲染。
  • 加固聊天标题生成和网页聊天流式传输,通过增加错误处理,并在需要时确保非流式工具事件被标记为流式。
  • 扩展提供方管理器和 LLMResponse 实体,以识别 OpenAI Responses 作为受支持的补全来源。

Tests:

  • 为上下文管理器添加单元测试,以验证在可用时会使用原生紧凑能力,并在失败时回退到摘要压缩。
  • 为 OpenAI Responses 提供方适配器添加单元测试,覆盖紧凑行为、可重试上游错误检测、请求头处理以及工具严格模式转换等场景。
Original summary in English

Summary by Sourcery

Integrate OpenAI Responses API as a first-class provider and enhance context compression and tool-calling flows to leverage native compact capabilities while improving robustness of streaming and title generation.

New Features:

  • Add an OpenAI Responses provider adapter that supports the /responses API for chatting, streaming, tools, and native compact context handling.
  • Introduce a configuration flag and defaults to prefer provider native compact APIs for LLM-based context compression when available.
  • Register an OpenAI Responses chat provider template in the default configuration for easy setup.

Enhancements:

  • Update the context compressor to detect and prefer native compact APIs, with graceful fallback to LLM summary-based compression if compacting fails.
  • Improve tool-call result streaming and dashboard message handling so tool_call and tool_call_result events are correctly propagated and rendered in ongoing conversations.
  • Harden chat title generation and webchat streaming by adding error handling and ensuring non-stream tool events are marked as streaming when needed.
  • Extend provider manager and LLMResponse entity to recognize OpenAI Responses as a supported completion source.

Tests:

  • Add unit tests for the context manager to verify native compact is used when available and that summary compression is used as a fallback on failure.
  • Add unit tests for the OpenAI Responses provider adapter covering compact behavior, retryable upstream error detection, header handling, and tool strictness conversion.

@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Feb 11, 2026
@dosubot
Copy link

dosubot bot commented Feb 11, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 4 个问题,并给出了一些高层次的反馈:

  • 新增的 llm_compress_use_compact_api 设置已经贯穿到了 MainAgentBuildConfig 和内部的 pipeline stage,但压缩逻辑在 supports_native_compact() 存在时总是优先使用 native compact;建议把这个 flag 继续传递到 ContextCompressor 和/或 provider,这样用户在需要时可以显式禁用 native compact。
  • _get_compress_provider 上新增的 active_provider 参数从未被使用;为了避免对其用途造成困惑,要么移除它,要么真正使用它(例如在 llm_compress_provider_id 未设置时作为回退)。
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new `llm_compress_use_compact_api` setting is plumbed into `MainAgentBuildConfig` and the internal pipeline stage, but the compressor logic always prefers native compact whenever `supports_native_compact()` is present; consider threading this flag through to the `ContextCompressor` and/or provider so users can explicitly disable native compact when desired.
- The new `active_provider` parameter on `_get_compress_provider` is never used; either remove it or use it (for example as a fallback when `llm_compress_provider_id` is not set) to avoid confusion about its purpose.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/context/compressor.py:249-257` </location>
<code_context>
             return messages

-        # build payload
+        native_compact_supported = self._supports_native_compact()
+
+        if native_compact_supported:
+            compacted = await self._try_native_compact(
+                system_messages,
+                messages_to_summarize,
+                recent_messages,
+            )
+            if compacted is not None:
+                return compacted
         instruction_message = Message(role="user", content=self.instruction_text)
</code_context>

<issue_to_address>
**issue (bug_risk):** 新增的 `llm_compress_use_compact_api` 标志似乎没有在压缩逻辑中真正生效。

该标志已经在线路配置和初始化中串联好了,但在 provider 支持的情况下,`__call__` 始终优先选择 native compact。如果用户应该能够禁用 native compact,那么 `__call__`(或 `_supports_native_compact`)在调用 `_try_native_compact` 之前应该检查 `llm_compress_use_compact_api`;否则这个标志实际上形同虚设。
</issue_to_address>

### Comment 2
<location> `astrbot/core/astr_main_agent.py:816-819` </location>
<code_context>


 def _get_compress_provider(
-    config: MainAgentBuildConfig, plugin_context: Context
+    config: MainAgentBuildConfig,
+    plugin_context: Context,
+    active_provider: Provider | None,
 ) -> Provider | None:
-    if not config.llm_compress_provider_id:
</code_context>

<issue_to_address>
**nitpick:** `_get_compress_provider` 中的 `active_provider` 参数未被使用。

`_get_compress_provider` 接受 `active_provider` 参数,但从未使用它。要么实现预期的回退/使用逻辑(例如在 `llm_compress_provider_id` 为空时),要么移除这个参数,以保持 API 清晰并避免对其用途产生困惑。
</issue_to_address>

### Comment 3
<location> `tests/test_openai_responses_source.py:28-37` </location>
<code_context>
+    return ProviderOpenAIResponses.__new__(ProviderOpenAIResponses)
+
+
+def test_is_retryable_upstream_error_with_5xx_status():
+    err = _DummyError("server error", status_code=502)
+
+    assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_with_upstream_error_type():
+    err = _DummyError(
+        "bad gateway",
+        status_code=400,
+        body={"error": {"type": "upstream_error"}},
+    )
+
+    assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
+    err = _DummyError(
+        "invalid request",
</code_context>

<issue_to_address>
**suggestion (testing):** 建议为 compact 和 responses 解析中使用的输入/输出转换辅助函数添加往返(round-trip)测试。

具体来说,建议覆盖以下内容:

- 当 compact responses 返回 `output` 而不是 `input` 时,测试 `_response_output_to_input_items``_output_content_to_input_content`- 测试 `_response_input_to_messages``_messages_to_response_input`,以确保消息(包括函数调用和工具输出)能够正确序列化为 Responses 的 `input`,并能反序列化回 `Message` 对象。

例如,可以添加以下测试:
- 构造一个具有代表性的 `output` 负载(message + function_call + function_call_output),依次通过 `_response_output_to_input_items``_response_input_to_messages`,并断言得到的 `Message` 对象具有预期的 `role``content``tool_calls``tool_call_id`- 构造一组 `Message` 字典(包括 assistant tool_calls 和 tool 消息),经 `_messages_to_response_input` 处理后,断言结果结构符合 Responses API 的 schema。

这样可以更有力地保证 compact 和普通 Responses 流程在这些转换上的一致性。

建议的实现如下:

```python
def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
    err = _DummyError(
        "invalid request",
        status_code=400,
        body={"error": {"type": "invalid_request_error"}},
    )

    assert not _provider()._is_retryable_upstream_error(err)


def test_response_output_to_input_items_and_back_to_messages_round_trip():
    provider = _provider()

    # Representative compact `output` payload: assistant message with a tool call
    # followed by the tool's output.
    response_output = [
        {
            "type": "message",
            "message": {
                "role": "assistant",
                "content": [
                    {
                        "type": "output_text",
                        "text": "Calling tool now...",
                    },
                ],
                "tool_calls": [
                    {
                        "id": "call_1",
                        "type": "function",
                        "function": {
                            "name": "get_weather",
                            "arguments": '{"location": "Boston"}',
                        },
                    }
                ],
            },
        },
        {
            "type": "tool_output",
            "tool_output": {
                "tool_call_id": "call_1",
                "output": '{"temperature": 72}',
            },
        },
    ]

    # When compact Responses return `output`, we should be able to:
    #   output -> input items -> Message objects
    input_items = provider._response_output_to_input_items(response_output)
    messages = provider._response_input_to_messages(
        {
            "input": input_items,
        }
    )

    # Assert we got the expected assistant message and tool message back.
    assert len(messages) == 2

    assistant_msg = messages[0]
    tool_msg = messages[1]

    # Assistant message expectations
    # Depending on the implementation, messages may be dict-like or objects.
    # We assert using attribute access if available, otherwise dict access.
    assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
    assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
    assistant_tool_calls = getattr(
        assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
    )

    assert assistant_role == "assistant"
    assert isinstance(assistant_content, list)
    assert any(
        (c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
        for c in assistant_content
    )
    assert assistant_tool_calls is not None
    assert len(assistant_tool_calls) == 1
    assistant_tool_call = assistant_tool_calls[0]
    assert assistant_tool_call["id"] == "call_1"
    assert assistant_tool_call["function"]["name"] == "get_weather"

    # Tool message expectations
    tool_role = getattr(tool_msg, "role", tool_msg["role"])
    tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
    tool_content = getattr(tool_msg, "content", tool_msg["content"])

    assert tool_role == "tool"
    assert tool_call_id == "call_1"
    assert isinstance(tool_content, list)
    # We only assert that some text mentioning the temperature is present
    assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")


def test_output_content_to_input_content_round_trip():
    provider = _provider()

    # A compact `output` style content list: assistant text + tool call.
    output_content = [
        {
            "type": "output_text",
            "text": "Calling tool now...",
        },
        {
            "type": "output_tool_call",
            "id": "call_1",
            "tool_name": "get_weather",
            "arguments": '{"location": "Boston"}',
        },
    ]

    # Transform output-style content into input-style content and back to messages.
    input_content = provider._output_content_to_input_content(output_content)

    # Wrap the content in a minimal input item to feed through the generic helpers.
    input_items = [
        {
            "role": "assistant",
            "content": input_content,
        }
    ]

    messages = provider._response_input_to_messages({"input": input_items})

    assert len(messages) == 1
    msg = messages[0]

    role = getattr(msg, "role", msg["role"])
    content = getattr(msg, "content", msg["content"])
    tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))

    assert role == "assistant"
    assert isinstance(content, list)
    # We should have preserved the output text in some form.
    assert any("Calling tool now..." in c.get("text", "") for c in content)
    # And we should still have a tool call associated with the assistant.
    assert tool_calls is not None
    assert len(tool_calls) == 1
    assert tool_calls[0]["id"] == "call_1"


def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
    provider = _provider()

    # Representative message sequence:
    #   - user asks a question
    #   - assistant issues a tool call
    #   - tool returns an answer
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "input_text",
                    "text": "What's the weather in Boston?",
                }
            ],
        },
        {
            "role": "assistant",
            "content": [
                {
                    "type": "output_text",
                    "text": "Let me check the weather for you.",
                }
            ],
            "tool_calls": [
                {
                    "id": "call_1",
                    "type": "function",
                    "function": {
                        "name": "get_weather",
                        "arguments": '{"location": "Boston"}',
                    },
                }
            ],
        },
        {
            "role": "tool",
            "tool_call_id": "call_1",
            "content": [
                {
                    "type": "output_text",
                    "text": '{"temperature": 72}',
                }
            ],
        },
    ]

    response_input = provider._messages_to_response_input(messages)

    # The helper should return something compatible with the Responses `input` schema.
    assert isinstance(response_input, dict)
    assert "input" in response_input

    input_items = response_input["input"]
    assert isinstance(input_items, list)
    assert len(input_items) == len(messages)

    # Check that user / assistant / tool roles are preserved in the serialized input.
    roles = [item.get("role") for item in input_items]
    assert roles == ["user", "assistant", "tool"]

    # Assistant tool call should be present in the serialized input.
    assistant_item = input_items[1]
    assert "tool_calls" in assistant_item
    assert len(assistant_item["tool_calls"]) == 1
    assert assistant_item["tool_calls"][0]["id"] == "call_1"

    # Tool message should carry the tool_call_id through.
    tool_item = input_items[2]
    assert tool_item.get("tool_call_id") == "call_1"

    # Now run the serialized input back through `_response_input_to_messages`
    # and assert we get equivalent messages out.
    round_trip_messages = provider._response_input_to_messages(response_input)

    assert len(round_trip_messages) == len(messages)

    for original, rt in zip(messages, round_trip_messages):
        rt_role = getattr(rt, "role", rt["role"])
        assert rt_role == original["role"]

    # Assistant round-trip should retain the tool call id and function name.
    rt_assistant = round_trip_messages[1]
    rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
    assert rt_tool_calls[0]["id"] == "call_1"
    assert rt_tool_calls[0]["function"]["name"] == "get_weather"

    # Tool round-trip should retain the tool_call_id.
    rt_tool = round_trip_messages[2]
    rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
    assert rt_tool_call_id == "call_1"


def test_build_responses_input_and_instructions_moves_system_messages():
    provider = _provider()
    provider.custom_headers = {}

```

这些新测试对现有实现有以下假设:

1. `_response_output_to_input_items` 接受一个由 `output` 项组成的列表(每个项都有一个 `"type"` 字段),并返回一个列表,可直接用作 Responses payload 中 `"input"` 的值。
2. `_response_input_to_messages` 接受一个带有 `"input"` 键的字典(符合 Responses schema),并返回一个有序的“类消息”对象序列。这些对象要么:
   - 是带有 `"role"``"content"``"tool_calls"``"tool_call_id"` 键的字典;要么
   - 是暴露 `.role``.content``.tool_calls``.tool_call_id` 属性的对象。测试通过优先使用 `getattr(..., default)`,再回退到字典访问的方式来同时兼容这两种情况。
3. `_output_content_to_input_content` 接受 compact 的 `output` 风格 content 列表,并返回一个 `input` 风格的 content 列表,可用于 `"input"` item 内部。
4. `_messages_to_response_input` 接受一组与 provider 在其他测试中已经使用的格式一致的消息字典,并返回一个包含兼容 Responses API 的 `"input"` 列表的字典。

如果你代码库中这些函数的签名与上述假设不一致,你应该调整:
- 测试中 `response_output``output_content``messages` 的结构,使之匹配实际内部消息格式;
- 传入 `_response_output_to_input_items``_output_content_to_input_content``_response_input_to_messages``_messages_to_response_input` 的参数(例如,如果 `_response_input_to_messages` 期望的不是带 `"input"` 键的字典,而是直接传入 input 列表)。

你也可以考虑:
- 引入具体的 `Message` 类,并让测试直接断言其属性,而不是使用“字典/属性双访问”的模式——前提是你的实现可以保证特定的类型。
</issue_to_address>

### Comment 4
<location> `astrbot/core/provider/sources/openai_responses_source.py:585` </location>
<code_context>
+        llm_response.usage = self._extract_response_usage(response.usage)
+        return llm_response
+
+    async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+        request_payload = dict(payloads)
+        response_input, instructions = self._build_responses_input_and_instructions(
</code_context>

<issue_to_address>
**issue (complexity):** 建议将 `_query``_query_stream` 中共享的请求构建逻辑抽取到一个专门的辅助函数中,以集中管理行为并避免重复的代码路径。

通过把共享的请求构建逻辑抽取到单一的辅助函数中,你可以显著减少重复代码,并让 `_query` / `_query_stream` 更易于维护。

目前这两段代码几乎完全相同:

```python
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
    request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
    request_payload["instructions"] = instructions

if tools:
    model = request_payload.get("model", "").lower()
    omit_empty_param_field = "gemini" in model
    tool_list = tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    )
    response_tools = self._convert_tools_to_responses(tool_list)
    if response_tools:
        request_payload["tools"] = response_tools

extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
    if key not in self.default_params:
        extra_body[key] = request_payload.pop(key)

custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
    extra_body.update(custom_extra_body)

extra_headers = self._build_extra_headers()
```

你可以将其提取为一个保持所有行为不变的专用辅助函数:

```python
def _build_responses_request(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
    request_payload = dict(payloads)

    response_input, instructions = self._build_responses_input_and_instructions(
        request_payload.pop("messages", [])
    )
    request_payload["input"] = response_input
    if instructions and not request_payload.get("instructions"):
        request_payload["instructions"] = instructions

    if tools:
        model = request_payload.get("model", "").lower()
        omit_empty_param_field = "gemini" in model
        tool_list = tools.get_func_desc_openai_style(
            omit_empty_parameter_field=omit_empty_param_field,
        )
        response_tools = self._convert_tools_to_responses(tool_list)
        if response_tools:
            request_payload["tools"] = response_tools

    extra_body: dict[str, Any] = {}
    for key in list(request_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = request_payload.pop(key)

    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    extra_headers = self._build_extra_headers()
    return request_payload, extra_body, extra_headers
```

然后 `_query``_query_stream` 就会变得简洁得多,并共享单一的事实来源:

```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )
    completion = await self.client.responses.create(
        **request_payload,
        stream=False,
        extra_body=extra_body,
        extra_headers=extra_headers,
    )
    ...
```

```python
async def _query_stream(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )

    response_id: str | None = None
    try:
        async with self.client.responses.stream(
            **request_payload,
            extra_body=extra_body,
            extra_headers=extra_headers,
        ) as stream:
            ...
```

这样既保留了所有原有行为(包括对 `gemini` 的特殊处理、工具转换、`custom_extra_body` 和 header 构建),又移除了在流式与非流式路径中重复的大块复杂逻辑,从而降低了未来两者行为发生分歧的风险。
</issue_to_address>

Sourcery 对开源项目是免费的——如果你觉得我们的评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进评审质量。
Original comment in English

Hey - I've found 4 issues, and left some high level feedback:

  • The new llm_compress_use_compact_api setting is plumbed into MainAgentBuildConfig and the internal pipeline stage, but the compressor logic always prefers native compact whenever supports_native_compact() is present; consider threading this flag through to the ContextCompressor and/or provider so users can explicitly disable native compact when desired.
  • The new active_provider parameter on _get_compress_provider is never used; either remove it or use it (for example as a fallback when llm_compress_provider_id is not set) to avoid confusion about its purpose.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new `llm_compress_use_compact_api` setting is plumbed into `MainAgentBuildConfig` and the internal pipeline stage, but the compressor logic always prefers native compact whenever `supports_native_compact()` is present; consider threading this flag through to the `ContextCompressor` and/or provider so users can explicitly disable native compact when desired.
- The new `active_provider` parameter on `_get_compress_provider` is never used; either remove it or use it (for example as a fallback when `llm_compress_provider_id` is not set) to avoid confusion about its purpose.

## Individual Comments

### Comment 1
<location> `astrbot/core/agent/context/compressor.py:249-257` </location>
<code_context>
             return messages

-        # build payload
+        native_compact_supported = self._supports_native_compact()
+
+        if native_compact_supported:
+            compacted = await self._try_native_compact(
+                system_messages,
+                messages_to_summarize,
+                recent_messages,
+            )
+            if compacted is not None:
+                return compacted
         instruction_message = Message(role="user", content=self.instruction_text)
</code_context>

<issue_to_address>
**issue (bug_risk):** The new `llm_compress_use_compact_api` flag does not appear to be honored in the compressor logic.

The flag is threaded through config and initialization, but `__call__` always prefers native compact whenever the provider supports it. If users should be able to disable native compact, `__call__` (or `_supports_native_compact`) should consult `llm_compress_use_compact_api` before calling `_try_native_compact`; otherwise the flag is effectively a no-op.
</issue_to_address>

### Comment 2
<location> `astrbot/core/astr_main_agent.py:816-819` </location>
<code_context>


 def _get_compress_provider(
-    config: MainAgentBuildConfig, plugin_context: Context
+    config: MainAgentBuildConfig,
+    plugin_context: Context,
+    active_provider: Provider | None,
 ) -> Provider | None:
-    if not config.llm_compress_provider_id:
</code_context>

<issue_to_address>
**nitpick:** `active_provider` parameter is unused in `_get_compress_provider`.

`_get_compress_provider` accepts `active_provider` but never uses it. Either add the intended fallback/use logic (e.g., when `llm_compress_provider_id` is empty) or remove the parameter to keep the API clear and avoid confusion about its purpose.
</issue_to_address>

### Comment 3
<location> `tests/test_openai_responses_source.py:28-37` </location>
<code_context>
+    return ProviderOpenAIResponses.__new__(ProviderOpenAIResponses)
+
+
+def test_is_retryable_upstream_error_with_5xx_status():
+    err = _DummyError("server error", status_code=502)
+
+    assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_with_upstream_error_type():
+    err = _DummyError(
+        "bad gateway",
+        status_code=400,
+        body={"error": {"type": "upstream_error"}},
+    )
+
+    assert _provider()._is_retryable_upstream_error(err)
+
+
+def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
+    err = _DummyError(
+        "invalid request",
</code_context>

<issue_to_address>
**suggestion (testing):** Consider adding round-trip tests for input/output conversion helpers used by compact and responses parsing.

Specifically, it would be useful to cover:

- `_response_output_to_input_items` and `_output_content_to_input_content` when compact responses return `output` instead of `input`.
- `_response_input_to_messages` and `_messages_to_response_input`, ensuring messages (including function calls and tool outputs) serialize to Responses `input` and deserialize back to `Message` objects correctly.

For example, add tests that:
- Build a representative `output` payload (message + function_call + function_call_output), run it through `_response_output_to_input_items` then `_response_input_to_messages`, and assert the resulting `Message` objects have the expected `role`, `content`, `tool_calls`, and `tool_call_id`.
- Build a list of `Message` dicts (including assistant tool_calls and tool messages), run `_messages_to_response_input`, and assert the resulting structure matches the Responses API schema.

This would give stronger guarantees that both compact and regular Responses flows are consistent across these transformations.

Suggested implementation:

```python
def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
    err = _DummyError(
        "invalid request",
        status_code=400,
        body={"error": {"type": "invalid_request_error"}},
    )

    assert not _provider()._is_retryable_upstream_error(err)


def test_response_output_to_input_items_and_back_to_messages_round_trip():
    provider = _provider()

    # Representative compact `output` payload: assistant message with a tool call
    # followed by the tool's output.
    response_output = [
        {
            "type": "message",
            "message": {
                "role": "assistant",
                "content": [
                    {
                        "type": "output_text",
                        "text": "Calling tool now...",
                    },
                ],
                "tool_calls": [
                    {
                        "id": "call_1",
                        "type": "function",
                        "function": {
                            "name": "get_weather",
                            "arguments": '{"location": "Boston"}',
                        },
                    }
                ],
            },
        },
        {
            "type": "tool_output",
            "tool_output": {
                "tool_call_id": "call_1",
                "output": '{"temperature": 72}',
            },
        },
    ]

    # When compact Responses return `output`, we should be able to:
    #   output -> input items -> Message objects
    input_items = provider._response_output_to_input_items(response_output)
    messages = provider._response_input_to_messages(
        {
            "input": input_items,
        }
    )

    # Assert we got the expected assistant message and tool message back.
    assert len(messages) == 2

    assistant_msg = messages[0]
    tool_msg = messages[1]

    # Assistant message expectations
    # Depending on the implementation, messages may be dict-like or objects.
    # We assert using attribute access if available, otherwise dict access.
    assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
    assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
    assistant_tool_calls = getattr(
        assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
    )

    assert assistant_role == "assistant"
    assert isinstance(assistant_content, list)
    assert any(
        (c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
        for c in assistant_content
    )
    assert assistant_tool_calls is not None
    assert len(assistant_tool_calls) == 1
    assistant_tool_call = assistant_tool_calls[0]
    assert assistant_tool_call["id"] == "call_1"
    assert assistant_tool_call["function"]["name"] == "get_weather"

    # Tool message expectations
    tool_role = getattr(tool_msg, "role", tool_msg["role"])
    tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
    tool_content = getattr(tool_msg, "content", tool_msg["content"])

    assert tool_role == "tool"
    assert tool_call_id == "call_1"
    assert isinstance(tool_content, list)
    # We only assert that some text mentioning the temperature is present
    assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")


def test_output_content_to_input_content_round_trip():
    provider = _provider()

    # A compact `output` style content list: assistant text + tool call.
    output_content = [
        {
            "type": "output_text",
            "text": "Calling tool now...",
        },
        {
            "type": "output_tool_call",
            "id": "call_1",
            "tool_name": "get_weather",
            "arguments": '{"location": "Boston"}',
        },
    ]

    # Transform output-style content into input-style content and back to messages.
    input_content = provider._output_content_to_input_content(output_content)

    # Wrap the content in a minimal input item to feed through the generic helpers.
    input_items = [
        {
            "role": "assistant",
            "content": input_content,
        }
    ]

    messages = provider._response_input_to_messages({"input": input_items})

    assert len(messages) == 1
    msg = messages[0]

    role = getattr(msg, "role", msg["role"])
    content = getattr(msg, "content", msg["content"])
    tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))

    assert role == "assistant"
    assert isinstance(content, list)
    # We should have preserved the output text in some form.
    assert any("Calling tool now..." in c.get("text", "") for c in content)
    # And we should still have a tool call associated with the assistant.
    assert tool_calls is not None
    assert len(tool_calls) == 1
    assert tool_calls[0]["id"] == "call_1"


def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
    provider = _provider()

    # Representative message sequence:
    #   - user asks a question
    #   - assistant issues a tool call
    #   - tool returns an answer
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "input_text",
                    "text": "What's the weather in Boston?",
                }
            ],
        },
        {
            "role": "assistant",
            "content": [
                {
                    "type": "output_text",
                    "text": "Let me check the weather for you.",
                }
            ],
            "tool_calls": [
                {
                    "id": "call_1",
                    "type": "function",
                    "function": {
                        "name": "get_weather",
                        "arguments": '{"location": "Boston"}',
                    },
                }
            ],
        },
        {
            "role": "tool",
            "tool_call_id": "call_1",
            "content": [
                {
                    "type": "output_text",
                    "text": '{"temperature": 72}',
                }
            ],
        },
    ]

    response_input = provider._messages_to_response_input(messages)

    # The helper should return something compatible with the Responses `input` schema.
    assert isinstance(response_input, dict)
    assert "input" in response_input

    input_items = response_input["input"]
    assert isinstance(input_items, list)
    assert len(input_items) == len(messages)

    # Check that user / assistant / tool roles are preserved in the serialized input.
    roles = [item.get("role") for item in input_items]
    assert roles == ["user", "assistant", "tool"]

    # Assistant tool call should be present in the serialized input.
    assistant_item = input_items[1]
    assert "tool_calls" in assistant_item
    assert len(assistant_item["tool_calls"]) == 1
    assert assistant_item["tool_calls"][0]["id"] == "call_1"

    # Tool message should carry the tool_call_id through.
    tool_item = input_items[2]
    assert tool_item.get("tool_call_id") == "call_1"

    # Now run the serialized input back through `_response_input_to_messages`
    # and assert we get equivalent messages out.
    round_trip_messages = provider._response_input_to_messages(response_input)

    assert len(round_trip_messages) == len(messages)

    for original, rt in zip(messages, round_trip_messages):
        rt_role = getattr(rt, "role", rt["role"])
        assert rt_role == original["role"]

    # Assistant round-trip should retain the tool call id and function name.
    rt_assistant = round_trip_messages[1]
    rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
    assert rt_tool_calls[0]["id"] == "call_1"
    assert rt_tool_calls[0]["function"]["name"] == "get_weather"

    # Tool round-trip should retain the tool_call_id.
    rt_tool = round_trip_messages[2]
    rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
    assert rt_tool_call_id == "call_1"


def test_build_responses_input_and_instructions_moves_system_messages():
    provider = _provider()
    provider.custom_headers = {}

```

The new tests assume the following about the existing implementation:

1. `_response_output_to_input_items` accepts a list of `output` items (each with a `"type"` field) and returns a list suitable for being used as the `"input"` value in the Responses payload.
2. `_response_input_to_messages` accepts a dict with an `"input"` key (matching the Responses schema) and returns an ordered sequence of message-like objects. Those objects either:
   - are dicts with keys like `"role"`, `"content"`, `"tool_calls"`, and `"tool_call_id"`, or
   - are objects exposing `.role`, `.content`, `.tool_calls`, and `.tool_call_id` attributes. The tests handle both by using `getattr(..., default)` first and falling back to dict access.
3. `_output_content_to_input_content` accepts a compact `output`-style content list and returns an `input`-style content list suitable for use inside an `"input"` item.
4. `_messages_to_response_input` accepts a list of message dicts matching what the provider already consumes elsewhere in tests and returns a dict containing an `"input"` list compatible with the Responses API.

If any of these signatures differ in your codebase, you should adjust:
- The shape of `response_output`, `output_content`, and `messages` in the tests to match your actual internal message format.
- The arguments passed into `_response_output_to_input_items`, `_output_content_to_input_content`, `_response_input_to_messages`, and `_messages_to_response_input` (for example, if `_response_input_to_messages` expects just the list of input items rather than a dict with `"input"`).

You may also want to:
- Import any concrete `Message` class and make the tests assert on its attributes directly instead of using the dict/attribute dual-access pattern, if your implementation guarantees a specific type.
</issue_to_address>

### Comment 4
<location> `astrbot/core/provider/sources/openai_responses_source.py:585` </location>
<code_context>
+        llm_response.usage = self._extract_response_usage(response.usage)
+        return llm_response
+
+    async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
+        request_payload = dict(payloads)
+        response_input, instructions = self._build_responses_input_and_instructions(
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting the shared request-building logic from `_query` and `_query_stream` into a dedicated helper to centralize behavior and avoid duplicated code paths.

You can significantly reduce duplication and make `_query` / `_query_stream` easier to maintain by factoring out the shared request construction into a single helper.

Right now these blocks are almost identical:

```python
request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
    request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
    request_payload["instructions"] = instructions

if tools:
    model = request_payload.get("model", "").lower()
    omit_empty_param_field = "gemini" in model
    tool_list = tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    )
    response_tools = self._convert_tools_to_responses(tool_list)
    if response_tools:
        request_payload["tools"] = response_tools

extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
    if key not in self.default_params:
        extra_body[key] = request_payload.pop(key)

custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
    extra_body.update(custom_extra_body)

extra_headers = self._build_extra_headers()
```

You can extract this into a focused helper that keeps all behavior intact:

```python
def _build_responses_request(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
    request_payload = dict(payloads)

    response_input, instructions = self._build_responses_input_and_instructions(
        request_payload.pop("messages", [])
    )
    request_payload["input"] = response_input
    if instructions and not request_payload.get("instructions"):
        request_payload["instructions"] = instructions

    if tools:
        model = request_payload.get("model", "").lower()
        omit_empty_param_field = "gemini" in model
        tool_list = tools.get_func_desc_openai_style(
            omit_empty_parameter_field=omit_empty_param_field,
        )
        response_tools = self._convert_tools_to_responses(tool_list)
        if response_tools:
            request_payload["tools"] = response_tools

    extra_body: dict[str, Any] = {}
    for key in list(request_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = request_payload.pop(key)

    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    extra_headers = self._build_extra_headers()
    return request_payload, extra_body, extra_headers
```

Then `_query` and `_query_stream` become much shorter and share a single source of truth:

```python
async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )
    completion = await self.client.responses.create(
        **request_payload,
        stream=False,
        extra_body=extra_body,
        extra_headers=extra_headers,
    )
    ...
```

```python
async def _query_stream(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )

    response_id: str | None = None
    try:
        async with self.client.responses.stream(
            **request_payload,
            extra_body=extra_body,
            extra_headers=extra_headers,
        ) as stream:
            ...
```

This keeps all behavior (including `gemini` handling, tool conversion, `custom_extra_body`, and header building) but removes a large duplicated block of non-trivial logic, reducing the chance of future divergence between streaming and non-streaming paths.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +249 to +257
native_compact_supported = self._supports_native_compact()

if native_compact_supported:
compacted = await self._try_native_compact(
system_messages,
messages_to_summarize,
recent_messages,
)
if compacted is not None:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 新增的 llm_compress_use_compact_api 标志似乎没有在压缩逻辑中真正生效。

该标志已经在线路配置和初始化中串联好了,但在 provider 支持的情况下,__call__ 始终优先选择 native compact。如果用户应该能够禁用 native compact,那么 __call__(或 _supports_native_compact)在调用 _try_native_compact 之前应该检查 llm_compress_use_compact_api;否则这个标志实际上形同虚设。

Original comment in English

issue (bug_risk): The new llm_compress_use_compact_api flag does not appear to be honored in the compressor logic.

The flag is threaded through config and initialization, but __call__ always prefers native compact whenever the provider supports it. If users should be able to disable native compact, __call__ (or _supports_native_compact) should consult llm_compress_use_compact_api before calling _try_native_compact; otherwise the flag is effectively a no-op.

Comment on lines 816 to +819
def _get_compress_provider(
config: MainAgentBuildConfig, plugin_context: Context
config: MainAgentBuildConfig,
plugin_context: Context,
active_provider: Provider | None,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: _get_compress_provider 中的 active_provider 参数未被使用。

_get_compress_provider 接受 active_provider 参数,但从未使用它。要么实现预期的回退/使用逻辑(例如在 llm_compress_provider_id 为空时),要么移除这个参数,以保持 API 清晰并避免对其用途产生困惑。

Original comment in English

nitpick: active_provider parameter is unused in _get_compress_provider.

_get_compress_provider accepts active_provider but never uses it. Either add the intended fallback/use logic (e.g., when llm_compress_provider_id is empty) or remove the parameter to keep the API clear and avoid confusion about its purpose.

Comment on lines +28 to +37
def test_is_retryable_upstream_error_with_5xx_status():
err = _DummyError("server error", status_code=502)

assert _provider()._is_retryable_upstream_error(err)


def test_is_retryable_upstream_error_with_upstream_error_type():
err = _DummyError(
"bad gateway",
status_code=400,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): 建议为 compact 和 responses 解析中使用的输入/输出转换辅助函数添加往返(round-trip)测试。

具体来说,建议覆盖以下内容:

  • 当 compact responses 返回 output 而不是 input 时,测试 _response_output_to_input_items_output_content_to_input_content
  • 测试 _response_input_to_messages_messages_to_response_input,以确保消息(包括函数调用和工具输出)能够正确序列化为 Responses 的 input,并能反序列化回 Message 对象。

例如,可以添加以下测试:

  • 构造一个具有代表性的 output 负载(message + function_call + function_call_output),依次通过 _response_output_to_input_items_response_input_to_messages,并断言得到的 Message 对象具有预期的 rolecontenttool_callstool_call_id
  • 构造一组 Message 字典(包括 assistant tool_calls 和 tool 消息),经 _messages_to_response_input 处理后,断言结果结构符合 Responses API 的 schema。

这样可以更有力地保证 compact 和普通 Responses 流程在这些转换上的一致性。

建议的实现如下:

def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
    err = _DummyError(
        "invalid request",
        status_code=400,
        body={"error": {"type": "invalid_request_error"}},
    )

    assert not _provider()._is_retryable_upstream_error(err)


def test_response_output_to_input_items_and_back_to_messages_round_trip():
    provider = _provider()

    # Representative compact `output` payload: assistant message with a tool call
    # followed by the tool's output.
    response_output = [
        {
            "type": "message",
            "message": {
                "role": "assistant",
                "content": [
                    {
                        "type": "output_text",
                        "text": "Calling tool now...",
                    },
                ],
                "tool_calls": [
                    {
                        "id": "call_1",
                        "type": "function",
                        "function": {
                            "name": "get_weather",
                            "arguments": '{"location": "Boston"}',
                        },
                    }
                ],
            },
        },
        {
            "type": "tool_output",
            "tool_output": {
                "tool_call_id": "call_1",
                "output": '{"temperature": 72}',
            },
        },
    ]

    # When compact Responses return `output`, we should be able to:
    #   output -> input items -> Message objects
    input_items = provider._response_output_to_input_items(response_output)
    messages = provider._response_input_to_messages(
        {
            "input": input_items,
        }
    )

    # Assert we got the expected assistant message and tool message back.
    assert len(messages) == 2

    assistant_msg = messages[0]
    tool_msg = messages[1]

    # Assistant message expectations
    # Depending on the implementation, messages may be dict-like or objects.
    # We assert using attribute access if available, otherwise dict access.
    assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
    assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
    assistant_tool_calls = getattr(
        assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
    )

    assert assistant_role == "assistant"
    assert isinstance(assistant_content, list)
    assert any(
        (c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
        for c in assistant_content
    )
    assert assistant_tool_calls is not None
    assert len(assistant_tool_calls) == 1
    assistant_tool_call = assistant_tool_calls[0]
    assert assistant_tool_call["id"] == "call_1"
    assert assistant_tool_call["function"]["name"] == "get_weather"

    # Tool message expectations
    tool_role = getattr(tool_msg, "role", tool_msg["role"])
    tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
    tool_content = getattr(tool_msg, "content", tool_msg["content"])

    assert tool_role == "tool"
    assert tool_call_id == "call_1"
    assert isinstance(tool_content, list)
    # We only assert that some text mentioning the temperature is present
    assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")


def test_output_content_to_input_content_round_trip():
    provider = _provider()

    # A compact `output` style content list: assistant text + tool call.
    output_content = [
        {
            "type": "output_text",
            "text": "Calling tool now...",
        },
        {
            "type": "output_tool_call",
            "id": "call_1",
            "tool_name": "get_weather",
            "arguments": '{"location": "Boston"}',
        },
    ]

    # Transform output-style content into input-style content and back to messages.
    input_content = provider._output_content_to_input_content(output_content)

    # Wrap the content in a minimal input item to feed through the generic helpers.
    input_items = [
        {
            "role": "assistant",
            "content": input_content,
        }
    ]

    messages = provider._response_input_to_messages({"input": input_items})

    assert len(messages) == 1
    msg = messages[0]

    role = getattr(msg, "role", msg["role"])
    content = getattr(msg, "content", msg["content"])
    tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))

    assert role == "assistant"
    assert isinstance(content, list)
    # We should have preserved the output text in some form.
    assert any("Calling tool now..." in c.get("text", "") for c in content)
    # And we should still have a tool call associated with the assistant.
    assert tool_calls is not None
    assert len(tool_calls) == 1
    assert tool_calls[0]["id"] == "call_1"


def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
    provider = _provider()

    # Representative message sequence:
    #   - user asks a question
    #   - assistant issues a tool call
    #   - tool returns an answer
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "input_text",
                    "text": "What's the weather in Boston?",
                }
            ],
        },
        {
            "role": "assistant",
            "content": [
                {
                    "type": "output_text",
                    "text": "Let me check the weather for you.",
                }
            ],
            "tool_calls": [
                {
                    "id": "call_1",
                    "type": "function",
                    "function": {
                        "name": "get_weather",
                        "arguments": '{"location": "Boston"}',
                    },
                }
            ],
        },
        {
            "role": "tool",
            "tool_call_id": "call_1",
            "content": [
                {
                    "type": "output_text",
                    "text": '{"temperature": 72}',
                }
            ],
        },
    ]

    response_input = provider._messages_to_response_input(messages)

    # The helper should return something compatible with the Responses `input` schema.
    assert isinstance(response_input, dict)
    assert "input" in response_input

    input_items = response_input["input"]
    assert isinstance(input_items, list)
    assert len(input_items) == len(messages)

    # Check that user / assistant / tool roles are preserved in the serialized input.
    roles = [item.get("role") for item in input_items]
    assert roles == ["user", "assistant", "tool"]

    # Assistant tool call should be present in the serialized input.
    assistant_item = input_items[1]
    assert "tool_calls" in assistant_item
    assert len(assistant_item["tool_calls"]) == 1
    assert assistant_item["tool_calls"][0]["id"] == "call_1"

    # Tool message should carry the tool_call_id through.
    tool_item = input_items[2]
    assert tool_item.get("tool_call_id") == "call_1"

    # Now run the serialized input back through `_response_input_to_messages`
    # and assert we get equivalent messages out.
    round_trip_messages = provider._response_input_to_messages(response_input)

    assert len(round_trip_messages) == len(messages)

    for original, rt in zip(messages, round_trip_messages):
        rt_role = getattr(rt, "role", rt["role"])
        assert rt_role == original["role"]

    # Assistant round-trip should retain the tool call id and function name.
    rt_assistant = round_trip_messages[1]
    rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
    assert rt_tool_calls[0]["id"] == "call_1"
    assert rt_tool_calls[0]["function"]["name"] == "get_weather"

    # Tool round-trip should retain the tool_call_id.
    rt_tool = round_trip_messages[2]
    rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
    assert rt_tool_call_id == "call_1"


def test_build_responses_input_and_instructions_moves_system_messages():
    provider = _provider()
    provider.custom_headers = {}

这些新测试对现有实现有以下假设:

  1. _response_output_to_input_items 接受一个由 output 项组成的列表(每个项都有一个 "type" 字段),并返回一个列表,可直接用作 Responses payload 中 "input" 的值。
  2. _response_input_to_messages 接受一个带有 "input" 键的字典(符合 Responses schema),并返回一个有序的“类消息”对象序列。这些对象要么:
    • 是带有 "role""content""tool_calls""tool_call_id" 键的字典;要么
    • 是暴露 .role.content.tool_calls.tool_call_id 属性的对象。测试通过优先使用 getattr(..., default),再回退到字典访问的方式来同时兼容这两种情况。
  3. _output_content_to_input_content 接受 compact 的 output 风格 content 列表,并返回一个 input 风格的 content 列表,可用于 "input" item 内部。
  4. _messages_to_response_input 接受一组与 provider 在其他测试中已经使用的格式一致的消息字典,并返回一个包含兼容 Responses API 的 "input" 列表的字典。

如果你代码库中这些函数的签名与上述假设不一致,你应该调整:

  • 测试中 response_outputoutput_contentmessages 的结构,使之匹配实际内部消息格式;
  • 传入 _response_output_to_input_items_output_content_to_input_content_response_input_to_messages_messages_to_response_input 的参数(例如,如果 _response_input_to_messages 期望的不是带 "input" 键的字典,而是直接传入 input 列表)。

你也可以考虑:

  • 引入具体的 Message 类,并让测试直接断言其属性,而不是使用“字典/属性双访问”的模式——前提是你的实现可以保证特定的类型。
Original comment in English

suggestion (testing): Consider adding round-trip tests for input/output conversion helpers used by compact and responses parsing.

Specifically, it would be useful to cover:

  • _response_output_to_input_items and _output_content_to_input_content when compact responses return output instead of input.
  • _response_input_to_messages and _messages_to_response_input, ensuring messages (including function calls and tool outputs) serialize to Responses input and deserialize back to Message objects correctly.

For example, add tests that:

  • Build a representative output payload (message + function_call + function_call_output), run it through _response_output_to_input_items then _response_input_to_messages, and assert the resulting Message objects have the expected role, content, tool_calls, and tool_call_id.
  • Build a list of Message dicts (including assistant tool_calls and tool messages), run _messages_to_response_input, and assert the resulting structure matches the Responses API schema.

This would give stronger guarantees that both compact and regular Responses flows are consistent across these transformations.

Suggested implementation:

def test_is_retryable_upstream_error_returns_false_for_non_retryable_error():
    err = _DummyError(
        "invalid request",
        status_code=400,
        body={"error": {"type": "invalid_request_error"}},
    )

    assert not _provider()._is_retryable_upstream_error(err)


def test_response_output_to_input_items_and_back_to_messages_round_trip():
    provider = _provider()

    # Representative compact `output` payload: assistant message with a tool call
    # followed by the tool's output.
    response_output = [
        {
            "type": "message",
            "message": {
                "role": "assistant",
                "content": [
                    {
                        "type": "output_text",
                        "text": "Calling tool now...",
                    },
                ],
                "tool_calls": [
                    {
                        "id": "call_1",
                        "type": "function",
                        "function": {
                            "name": "get_weather",
                            "arguments": '{"location": "Boston"}',
                        },
                    }
                ],
            },
        },
        {
            "type": "tool_output",
            "tool_output": {
                "tool_call_id": "call_1",
                "output": '{"temperature": 72}',
            },
        },
    ]

    # When compact Responses return `output`, we should be able to:
    #   output -> input items -> Message objects
    input_items = provider._response_output_to_input_items(response_output)
    messages = provider._response_input_to_messages(
        {
            "input": input_items,
        }
    )

    # Assert we got the expected assistant message and tool message back.
    assert len(messages) == 2

    assistant_msg = messages[0]
    tool_msg = messages[1]

    # Assistant message expectations
    # Depending on the implementation, messages may be dict-like or objects.
    # We assert using attribute access if available, otherwise dict access.
    assistant_role = getattr(assistant_msg, "role", assistant_msg["role"])
    assistant_content = getattr(assistant_msg, "content", assistant_msg["content"])
    assistant_tool_calls = getattr(
        assistant_msg, "tool_calls", assistant_msg.get("tool_calls")
    )

    assert assistant_role == "assistant"
    assert isinstance(assistant_content, list)
    assert any(
        (c.get("type") == "output_text" and "Calling tool now..." in c.get("text", ""))
        for c in assistant_content
    )
    assert assistant_tool_calls is not None
    assert len(assistant_tool_calls) == 1
    assistant_tool_call = assistant_tool_calls[0]
    assert assistant_tool_call["id"] == "call_1"
    assert assistant_tool_call["function"]["name"] == "get_weather"

    # Tool message expectations
    tool_role = getattr(tool_msg, "role", tool_msg["role"])
    tool_call_id = getattr(tool_msg, "tool_call_id", tool_msg.get("tool_call_id"))
    tool_content = getattr(tool_msg, "content", tool_msg["content"])

    assert tool_role == "tool"
    assert tool_call_id == "call_1"
    assert isinstance(tool_content, list)
    # We only assert that some text mentioning the temperature is present
    assert any("72" in c.get("text", "") for c in tool_content if c.get("type") == "output_text")


def test_output_content_to_input_content_round_trip():
    provider = _provider()

    # A compact `output` style content list: assistant text + tool call.
    output_content = [
        {
            "type": "output_text",
            "text": "Calling tool now...",
        },
        {
            "type": "output_tool_call",
            "id": "call_1",
            "tool_name": "get_weather",
            "arguments": '{"location": "Boston"}',
        },
    ]

    # Transform output-style content into input-style content and back to messages.
    input_content = provider._output_content_to_input_content(output_content)

    # Wrap the content in a minimal input item to feed through the generic helpers.
    input_items = [
        {
            "role": "assistant",
            "content": input_content,
        }
    ]

    messages = provider._response_input_to_messages({"input": input_items})

    assert len(messages) == 1
    msg = messages[0]

    role = getattr(msg, "role", msg["role"])
    content = getattr(msg, "content", msg["content"])
    tool_calls = getattr(msg, "tool_calls", msg.get("tool_calls"))

    assert role == "assistant"
    assert isinstance(content, list)
    # We should have preserved the output text in some form.
    assert any("Calling tool now..." in c.get("text", "") for c in content)
    # And we should still have a tool call associated with the assistant.
    assert tool_calls is not None
    assert len(tool_calls) == 1
    assert tool_calls[0]["id"] == "call_1"


def test_messages_to_response_input_schema_for_tool_calls_and_outputs():
    provider = _provider()

    # Representative message sequence:
    #   - user asks a question
    #   - assistant issues a tool call
    #   - tool returns an answer
    messages = [
        {
            "role": "user",
            "content": [
                {
                    "type": "input_text",
                    "text": "What's the weather in Boston?",
                }
            ],
        },
        {
            "role": "assistant",
            "content": [
                {
                    "type": "output_text",
                    "text": "Let me check the weather for you.",
                }
            ],
            "tool_calls": [
                {
                    "id": "call_1",
                    "type": "function",
                    "function": {
                        "name": "get_weather",
                        "arguments": '{"location": "Boston"}',
                    },
                }
            ],
        },
        {
            "role": "tool",
            "tool_call_id": "call_1",
            "content": [
                {
                    "type": "output_text",
                    "text": '{"temperature": 72}',
                }
            ],
        },
    ]

    response_input = provider._messages_to_response_input(messages)

    # The helper should return something compatible with the Responses `input` schema.
    assert isinstance(response_input, dict)
    assert "input" in response_input

    input_items = response_input["input"]
    assert isinstance(input_items, list)
    assert len(input_items) == len(messages)

    # Check that user / assistant / tool roles are preserved in the serialized input.
    roles = [item.get("role") for item in input_items]
    assert roles == ["user", "assistant", "tool"]

    # Assistant tool call should be present in the serialized input.
    assistant_item = input_items[1]
    assert "tool_calls" in assistant_item
    assert len(assistant_item["tool_calls"]) == 1
    assert assistant_item["tool_calls"][0]["id"] == "call_1"

    # Tool message should carry the tool_call_id through.
    tool_item = input_items[2]
    assert tool_item.get("tool_call_id") == "call_1"

    # Now run the serialized input back through `_response_input_to_messages`
    # and assert we get equivalent messages out.
    round_trip_messages = provider._response_input_to_messages(response_input)

    assert len(round_trip_messages) == len(messages)

    for original, rt in zip(messages, round_trip_messages):
        rt_role = getattr(rt, "role", rt["role"])
        assert rt_role == original["role"]

    # Assistant round-trip should retain the tool call id and function name.
    rt_assistant = round_trip_messages[1]
    rt_tool_calls = getattr(rt_assistant, "tool_calls", rt_assistant.get("tool_calls"))
    assert rt_tool_calls[0]["id"] == "call_1"
    assert rt_tool_calls[0]["function"]["name"] == "get_weather"

    # Tool round-trip should retain the tool_call_id.
    rt_tool = round_trip_messages[2]
    rt_tool_call_id = getattr(rt_tool, "tool_call_id", rt_tool.get("tool_call_id"))
    assert rt_tool_call_id == "call_1"


def test_build_responses_input_and_instructions_moves_system_messages():
    provider = _provider()
    provider.custom_headers = {}

The new tests assume the following about the existing implementation:

  1. _response_output_to_input_items accepts a list of output items (each with a "type" field) and returns a list suitable for being used as the "input" value in the Responses payload.
  2. _response_input_to_messages accepts a dict with an "input" key (matching the Responses schema) and returns an ordered sequence of message-like objects. Those objects either:
    • are dicts with keys like "role", "content", "tool_calls", and "tool_call_id", or
    • are objects exposing .role, .content, .tool_calls, and .tool_call_id attributes. The tests handle both by using getattr(..., default) first and falling back to dict access.
  3. _output_content_to_input_content accepts a compact output-style content list and returns an input-style content list suitable for use inside an "input" item.
  4. _messages_to_response_input accepts a list of message dicts matching what the provider already consumes elsewhere in tests and returns a dict containing an "input" list compatible with the Responses API.

If any of these signatures differ in your codebase, you should adjust:

  • The shape of response_output, output_content, and messages in the tests to match your actual internal message format.
  • The arguments passed into _response_output_to_input_items, _output_content_to_input_content, _response_input_to_messages, and _messages_to_response_input (for example, if _response_input_to_messages expects just the list of input items rather than a dict with "input").

You may also want to:

  • Import any concrete Message class and make the tests assert on its attributes directly instead of using the dict/attribute dual-access pattern, if your implementation guarantees a specific type.

llm_response.usage = self._extract_response_usage(response.usage)
return llm_response

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议将 _query_query_stream 中共享的请求构建逻辑抽取到一个专门的辅助函数中,以集中管理行为并避免重复的代码路径。

通过把共享的请求构建逻辑抽取到单一的辅助函数中,你可以显著减少重复代码,并让 _query / _query_stream 更易于维护。

目前这两段代码几乎完全相同:

request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
    request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
    request_payload["instructions"] = instructions

if tools:
    model = request_payload.get("model", "").lower()
    omit_empty_param_field = "gemini" in model
    tool_list = tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    )
    response_tools = self._convert_tools_to_responses(tool_list)
    if response_tools:
        request_payload["tools"] = response_tools

extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
    if key not in self.default_params:
        extra_body[key] = request_payload.pop(key)

custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
    extra_body.update(custom_extra_body)

extra_headers = self._build_extra_headers()

你可以将其提取为一个保持所有行为不变的专用辅助函数:

def _build_responses_request(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
    request_payload = dict(payloads)

    response_input, instructions = self._build_responses_input_and_instructions(
        request_payload.pop("messages", [])
    )
    request_payload["input"] = response_input
    if instructions and not request_payload.get("instructions"):
        request_payload["instructions"] = instructions

    if tools:
        model = request_payload.get("model", "").lower()
        omit_empty_param_field = "gemini" in model
        tool_list = tools.get_func_desc_openai_style(
            omit_empty_parameter_field=omit_empty_param_field,
        )
        response_tools = self._convert_tools_to_responses(tool_list)
        if response_tools:
            request_payload["tools"] = response_tools

    extra_body: dict[str, Any] = {}
    for key in list(request_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = request_payload.pop(key)

    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    extra_headers = self._build_extra_headers()
    return request_payload, extra_body, extra_headers

然后 _query_query_stream 就会变得简洁得多,并共享单一的事实来源:

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )
    completion = await self.client.responses.create(
        **request_payload,
        stream=False,
        extra_body=extra_body,
        extra_headers=extra_headers,
    )
    ...
async def _query_stream(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )

    response_id: str | None = None
    try:
        async with self.client.responses.stream(
            **request_payload,
            extra_body=extra_body,
            extra_headers=extra_headers,
        ) as stream:
            ...

这样既保留了所有原有行为(包括对 gemini 的特殊处理、工具转换、custom_extra_body 和 header 构建),又移除了在流式与非流式路径中重复的大块复杂逻辑,从而降低了未来两者行为发生分歧的风险。

Original comment in English

issue (complexity): Consider extracting the shared request-building logic from _query and _query_stream into a dedicated helper to centralize behavior and avoid duplicated code paths.

You can significantly reduce duplication and make _query / _query_stream easier to maintain by factoring out the shared request construction into a single helper.

Right now these blocks are almost identical:

request_payload = dict(payloads)
response_input, instructions = self._build_responses_input_and_instructions(
    request_payload.pop("messages", [])
)
request_payload["input"] = response_input
if instructions and not request_payload.get("instructions"):
    request_payload["instructions"] = instructions

if tools:
    model = request_payload.get("model", "").lower()
    omit_empty_param_field = "gemini" in model
    tool_list = tools.get_func_desc_openai_style(
        omit_empty_parameter_field=omit_empty_param_field,
    )
    response_tools = self._convert_tools_to_responses(tool_list)
    if response_tools:
        request_payload["tools"] = response_tools

extra_body: dict[str, Any] = {}
for key in list(request_payload.keys()):
    if key not in self.default_params:
        extra_body[key] = request_payload.pop(key)

custom_extra_body = self.provider_config.get("custom_extra_body", {})
if isinstance(custom_extra_body, dict):
    extra_body.update(custom_extra_body)

extra_headers = self._build_extra_headers()

You can extract this into a focused helper that keeps all behavior intact:

def _build_responses_request(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> tuple[dict[str, Any], dict[str, Any], dict[str, str]]:
    request_payload = dict(payloads)

    response_input, instructions = self._build_responses_input_and_instructions(
        request_payload.pop("messages", [])
    )
    request_payload["input"] = response_input
    if instructions and not request_payload.get("instructions"):
        request_payload["instructions"] = instructions

    if tools:
        model = request_payload.get("model", "").lower()
        omit_empty_param_field = "gemini" in model
        tool_list = tools.get_func_desc_openai_style(
            omit_empty_parameter_field=omit_empty_param_field,
        )
        response_tools = self._convert_tools_to_responses(tool_list)
        if response_tools:
            request_payload["tools"] = response_tools

    extra_body: dict[str, Any] = {}
    for key in list(request_payload.keys()):
        if key not in self.default_params:
            extra_body[key] = request_payload.pop(key)

    custom_extra_body = self.provider_config.get("custom_extra_body", {})
    if isinstance(custom_extra_body, dict):
        extra_body.update(custom_extra_body)

    extra_headers = self._build_extra_headers()
    return request_payload, extra_body, extra_headers

Then _query and _query_stream become much shorter and share a single source of truth:

async def _query(self, payloads: dict, tools: ToolSet | None) -> LLMResponse:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )
    completion = await self.client.responses.create(
        **request_payload,
        stream=False,
        extra_body=extra_body,
        extra_headers=extra_headers,
    )
    ...
async def _query_stream(
    self,
    payloads: dict,
    tools: ToolSet | None,
) -> AsyncGenerator[LLMResponse, None]:
    request_payload, extra_body, extra_headers = self._build_responses_request(
        payloads, tools
    )

    response_id: str | None = None
    try:
        async with self.client.responses.stream(
            **request_payload,
            extra_body=extra_body,
            extra_headers=extra_headers,
        ) as stream:
            ...

This keeps all behavior (including gemini handling, tool conversion, custom_extra_body, and header building) but removes a large duplicated block of non-trivial logic, reducing the chance of future divergence between streaming and non-streaming paths.

@dosubot dosubot bot added the area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. label Feb 11, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:provider The bug / feature is about AI Provider, Models, LLM Agent, LLM Agent Runner. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant