Skip to content

Conversation

@w31r4
Copy link

@w31r4 w31r4 commented Feb 11, 2026

Shipyard Neo 在 AstrBot 的接入已完成多轮提交,但在与 ship/gull 预置 Skills 共存时仍有同步策略问题:

  • 原同步逻辑会整目录覆盖 skills/,可能影响预置 skills 的可见性与可用性;
  • 本地上传 skill 与运行时预置 skill 的展示与元数据回填链路不够完整。

本 PR 基于既有提交继续增量完善,补齐“上传 -> 同步 -> 元数据回填 -> 统一展示”的闭环。

Modifications / 改动点

一、已有提交(本 PR 既有内容)

  • feat(computer): add shipyard_neo booter runtime and sandbox config
    • 新增 shipyard_neo booter 运行时接入与配置能力。
  • feat(skills): add neo lifecycle tools and stable sync manager
    • 增加 Neo skill 生命周期工具链与 stable 同步管理。
  • feat(dashboard): add neo skills APIs and management UI
    • 增加 Dashboard 端 Neo skills 相关 API 与管理能力。
  • fix: address neo skill review findings
    • 修复评审中发现的问题,完善可用性与稳定性。

二、本次增量修补(新增提交 40c7cf39

  • 调整沙箱 skills 同步策略:从“整目录清空覆盖”改为“托管覆盖”,保留 ship/gull 预置 skills。
  • 同步后主动读取沙箱 skills 元数据并回填本地缓存。
  • SkillManager.list_skills(runtime="sandbox") 中合并“本地上传 + 沙箱预置”skills 视图。
  • 本地 skills 上传/删除后触发 best-effort 同步到活跃沙箱。
  • 增加针对上述行为的测试用例(同步策略与缓存合并)。

三、核心变更文件(本次增量)

  • astrbot/core/computer/computer_client.py

  • astrbot/core/skills/skill_manager.py

  • astrbot/dashboard/routes/skills.py

  • tests/test_computer_skill_sync.py

  • tests/test_skill_manager_sandbox_cache.py

  • This is NOT a breaking change. / 这不是一个破坏性变更。

Screenshots or Test Results / 运行截图或测试结果

本次增量修补在本地完成以下验证:

  1. 代码规范检查
  • uv run ruff format .
  • uv run ruff check .
  1. 相关测试
  • uv run pytest -q tests/test_skill_manager_sandbox_cache.py tests/test_computer_skill_sync.py tests/test_neo_skill_sync.py
    • 结果:6 passed
  • uv run pytest -q tests/test_dashboard.py -k neo_skills_routes
    • 结果:1 passed

Checklist / 检查清单

  • 😊 如果 PR 中有新加入的功能,已经通过 Issue / 邮件等方式和作者讨论过。
  • 👀 我的更改经过了良好的测试,并已在上方提供了“验证步骤”和“运行截图”
  • 🤓 我确保没有引入新依赖库,或者引入了新依赖库的同时将其添加到了 requirements.txtpyproject.toml 文件相应位置。
  • 😮 我的更改没有引入恶意代码。

@dosubot dosubot bot added the size:XXL This PR changes 1000+ lines, ignoring generated files. label Feb 11, 2026
@dosubot
Copy link

dosubot bot commented Feb 11, 2026

Related Documentation

Checked 1 published document(s) in 1 knowledge base(s). No updates required.

How did I do? Any feedback?  Join Discord

@w31r4 w31r4 changed the title feat: integrate Shipyard Neo self-iterating skill loop and dashboard controls feat: 接入 Shipyard Neo 自迭代 Skill 闭环与管理能力 Feb 11, 2026
Copy link
Contributor

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey - 我发现了 4 个问题,并给出了一些整体反馈:

  • 你在 pyproject.toml 中添加了对 shipyard-neo-sdk 的直接引用,但没有在 requirements.txt 中添加;如果你的部署或开发流程仍依赖 requirements.txt,建议也在其中添加相同依赖,以保持两个清单文件的一致性。
给 AI Agent 的提示
Please address the comments from this code review:

## Overall Comments
- You added `shipyard-neo-sdk` as a direct reference in `pyproject.toml` but not in `requirements.txt`; if your deployment or dev workflows still rely on `requirements.txt`, consider adding the same dependency there to keep the two manifests consistent.

## Individual Comments

### Comment 1
<location> `astrbot/dashboard/routes/skills.py:304-313` </location>
<code_context>
+            sync_json: dict[str, Any] | None = None
+            rollback_json: dict[str, Any] | None = None
+            if stage == "stable" and sync_to_local:
+                sync_mgr = NeoSkillSyncManager()
+                try:
+                    sync_result = await sync_mgr.sync_release(
</code_context>

<issue_to_address>
**suggestion (performance):** Stable promotion triggers skills sync twice (inside sync manager and again after), which is redundant.

When `stage == "stable" and sync_to_local`, `NeoSkillSyncManager().sync_release(...)` already triggers `sync_skills_to_active_sandboxes()`, and this route then calls `sync_skills_to_active_sandboxes()` again in the best-effort block. That means the same change is synced twice in a row. Consider relying on just one of these (either the manager’s call or the explicit call here) to avoid redundant work.

Suggested implementation:

```python
                sync_json = None
                did_sync_to_local = False
                if stage == "stable" and sync_to_local:
                    sync_mgr = NeoSkillSyncManager()
                    try:
                        sync_result = await sync_mgr.sync_release(
                            client,
                            release_id=str(release_json.get("id", "")),
                            require_stable=True,
                        )
                        did_sync_to_local = True
                        sync_json = {
                            "skill_key": sync_result.skill_key,
                            "local_skill_name": sync_result.local_skill_name,
                            "release_id": sync_result.release_id,
                            "candidate_id": sync_result.candidate_id,
                            "payload_ref": sync_result.payload_ref,

```

To fully avoid the double sync, you should also adjust the later "best-effort" sync call in this route:

1. Locate the block that currently calls `sync_skills_to_active_sandboxes()` (likely in a `try/except` "best-effort" section after the snippet you provided).
2. Wrap that call so it only runs when `did_sync_to_local` is `False`, for example:

   ```python
   if not did_sync_to_local:
       await sync_skills_to_active_sandboxes()
   ```

This way, when `stage == "stable" and sync_to_local` and `NeoSkillSyncManager().sync_release(...)` has already performed the sync, the route will skip the redundant second call.
</issue_to_address>

### Comment 2
<location> `astrbot/core/computer/booters/base.py:16-22` </location>
<code_context>
     @property
     def shell(self) -> ShellComponent: ...

+    @property
+    def browser(self) -> BrowserComponent:
+        raise NotImplementedError(
+            f"{self.__class__.__name__} does not support browser capability."
+        )
+
</code_context>

<issue_to_address>
**issue (bug_risk):** The base `browser` property raising `NotImplementedError` conflicts with `getattr(..., "browser", None)` checks in callers.

Because callers like `_get_browser_component` use `getattr(booter, "browser", None)` for capability detection, this property will raise instead of returning `None`, so the `if browser is None` path never executes. While the outer `try/except` masks the exception, it makes feature detection brittle and hides the clearer error message. Consider either returning `None` in the base implementation and overriding it in concrete booters, or updating callers to explicitly handle `NotImplementedError` when probing for `browser` support.
</issue_to_address>

### Comment 3
<location> `astrbot/dashboard/routes/skills.py:171` </location>
<code_context>
             logger.error(traceback.format_exc())
             return Response().error(str(e)).__dict__
+
+    async def get_neo_candidates(self):
+        try:
+            endpoint, access_token = self._get_neo_client_config()
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for Neo client usage and sync result formatting to simplify the SkillsRoute handlers and remove duplication.

You can keep all behavior but reduce complexity/duplication inside `SkillsRoute` by factoring out the repeated “get config → open BayClient → call API → shape response” pattern and the repeated sync-result shaping.

### 1. Centralize BayClient construction and error handling

The Neo endpoints all do:

- `_get_neo_client_config`
- import `BayClient`
- `async with BayClient(...) as client: ...`
- try/except with identical logging + error response

You can move this into a small helper to remove repetition and keep each handler focused on *what* it does rather than *how* it talks to Neo:

```python
from shipyard_neo import BayClient  # move import to top-level

class SkillsRoute(Route):
    ...

    async def _with_neo_client(self, func):
        try:
            endpoint, access_token = self._get_neo_client_config()
            async with BayClient(
                endpoint_url=endpoint,
                access_token=access_token,
            ) as client:
                return await func(client)
        except Exception as e:
            logger.error(traceback.format_exc())
            return Response().error(str(e)).__dict__
```

Usage example (applies to `get_neo_candidates`, `get_neo_releases`, `get_neo_payload`, `evaluate_neo_candidate`, `promote_neo_candidate`, `rollback_neo_release`, `sync_neo_release`):

```python
async def get_neo_candidates(self):
    status = request.args.get("status")
    skill_key = request.args.get("skill_key")
    limit = int(request.args.get("limit", 100))
    offset = int(request.args.get("offset", 0))

    async def _do(client):
        candidates = await client.skills.list_candidates(
            status=status,
            skill_key=skill_key,
            limit=limit,
            offset=offset,
        )
        return Response().ok(_to_jsonable(candidates)).__dict__

    return await self._with_neo_client(_do)
```

This keeps route methods short and removes boilerplate.

### 2. Factor out the sync result shaping

`promote_neo_candidate` and `sync_neo_release` both build the same dict from `NeoSkillSyncResult`:

```python
{
    "skill_key": result.skill_key,
    "local_skill_name": result.local_skill_name,
    "release_id": result.release_id,
    "candidate_id": result.candidate_id,
    "payload_ref": result.payload_ref,
    "map_path": result.map_path,
    "synced_at": result.synced_at,
}
```

You can extract a tiny helper to avoid drift and keep both endpoints consistent:

```python
def _sync_result_to_dict(self, result):
    return {
        "skill_key": result.skill_key,
        "local_skill_name": result.local_skill_name,
        "release_id": result.release_id,
        "candidate_id": result.candidate_id,
        "payload_ref": result.payload_ref,
        "map_path": result.map_path,
        "synced_at": result.synced_at,
    }
```

Then:

```python
# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
    sync_result = await sync_mgr.sync_release(...)
    sync_json = self._sync_result_to_dict(sync_result)

# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__
```

These two small extractions reduce duplication and make the route methods materially simpler without changing any behavior or moving to a full-blown service layer.
</issue_to_address>

### Comment 4
<location> `astrbot/core/computer/tools/neo_skills.py:51` </location>
<code_context>
+    return browser
+
+
+@dataclass
+class BrowserExecTool(FunctionTool):
+    name: str = "astrbot_execute_browser"
</code_context>

<issue_to_address>
**issue (complexity):** Consider introducing a shared Neo tool base class and moving promote/sync/rollback orchestration into NeoSkillSyncManager to eliminate repeated admin/client/error/JSON boilerplate and centralize lifecycle logic.

You can reduce the complexity and duplication meaningfully without changing behavior by introducing a shared base and centralizing the promotion/sync/rollback orchestration.

### 1. Factor out a common Neo tool base

All tools repeat the same pattern:

- ` _ensure_admin`
- `_get_neo_context`
- try/except → call SDKJSON-ify or format error

You can keep behavior identical but move the plumbing into a base class so each tool only defines its parameters and the actual SDK call.

```python
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable

@dataclass
class NeoSkillToolBase(FunctionTool):
    error_prefix: str = "Error"

    async def _run(
        self,
        context: ContextWrapper[AstrAgentContext],
        neo_call: Callable[[Any, Any], Awaitable[Any]],
        error_action: str,
    ) -> ToolExecResult:
        if err := _ensure_admin(context):
            return err
        try:
            client, sandbox = await _get_neo_context(context)
            result = await neo_call(client, sandbox)
            return _to_json_text(result)
        except Exception as e:
            return f"{self.error_prefix} {error_action}: {str(e)}"
```

Then each tool becomes much smaller; for example, `ListSkillReleasesTool`:

```python
@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
    name: str = "astrbot_list_skill_releases"
    description: str = "List skill releases."
    parameters: dict = field(
        default_factory=lambda: {
            "type": "object",
            "properties": {
                "skill_key": {"type": "string"},
                "active_only": {"type": "boolean", "default": False},
                "stage": {"type": "string"},
                "limit": {"type": "integer", "default": 100},
                "offset": {"type": "integer", "default": 0},
            },
            "required": [],
        }
    )

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        skill_key: str | None = None,
        active_only: bool = False,
        stage: str | None = None,
        limit: int = 100,
        offset: int = 0,
    ) -> ToolExecResult:
        return await self._run(
            context,
            lambda client, _sandbox: client.skills.list_releases(
                skill_key=skill_key,
                active_only=active_only,
                stage=stage,
                limit=limit,
                offset=offset,
            ),
            error_action="listing skill releases",
        )
```

You can gradually migrate each tool to this pattern; it keeps the same signatures and behavior but removes repeated admin/client/error/JSON boilerplate.

### 2. Centralize promote/sync/rollback orchestration

`PromoteSkillCandidateTool` contains orchestration logic that’s already conceptually part of `NeoSkillSyncManager`/lifecycle concerns. You can move the flow into a helper function/service and keep the tool thin.

For example, add a method to `NeoSkillSyncManager` (or a small service next to it):

```python
class NeoSkillSyncManager:
    # existing methods...

    async def promote_with_optional_sync(
        self,
        client: Any,
        candidate_id: str,
        stage: str,
        sync_to_local: bool,
    ) -> dict[str, Any]:
        release = await client.skills.promote_candidate(candidate_id, stage=stage)
        release_json = _to_jsonable(release)

        sync_json: dict[str, Any] | None = None
        rollback_json: dict[str, Any] | None = None

        if stage == "stable" and sync_to_local:
            try:
                sync_result = await self.sync_release(
                    client,
                    release_id=str(release_json.get("id", "")),
                    require_stable=True,
                )
                sync_json = {
                    "skill_key": sync_result.skill_key,
                    "local_skill_name": sync_result.local_skill_name,
                    "release_id": sync_result.release_id,
                    "candidate_id": sync_result.candidate_id,
                    "payload_ref": sync_result.payload_ref,
                    "map_path": sync_result.map_path,
                    "synced_at": sync_result.synced_at,
                }
            except Exception as sync_err:
                try:
                    rollback = await client.skills.rollback_release(
                        str(release_json.get("id", ""))
                    )
                    rollback_json = _to_jsonable(rollback)
                except Exception as rollback_err:
                    raise RuntimeError(
                        "stable release sync failed and auto rollback also failed; "
                        f"sync_error={sync_err}; rollback_error={rollback_err}"
                    ) from rollback_err
                raise RuntimeError(
                    "stable release sync failed; auto rollback succeeded; "
                    f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
                ) from sync_err

        return {
            "release": release_json,
            "sync": sync_json,
            "rollback": rollback_json,
        }
```

Then `PromoteSkillCandidateTool.call` becomes:

```python
@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
    name: str = "astrbot_promote_skill_candidate"
    description: str = "Promote one candidate to release stage (canary/stable)."
    # parameters unchanged ...

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        candidate_id: str,
        stage: str = "canary",
        sync_to_local: bool = True,
    ) -> ToolExecResult:
        if stage not in {"canary", "stable"}:
            return "Error promoting skill candidate: stage must be canary or stable."

        if err := _ensure_admin(context):
            return err

        try:
            client, _sandbox = await _get_neo_context(context)
            sync_mgr = NeoSkillSyncManager()
            result = await sync_mgr.promote_with_optional_sync(
                client=client,
                candidate_id=candidate_id,
                stage=stage,
                sync_to_local=sync_to_local,
            )
            return _to_json_text(result)
        except Exception as e:
            return f"Error promoting skill candidate: {str(e)}"
```

This keeps all current behavior (including rollback semantics and error messages, if you preserve them in the manager) but:

- Removes duplicated orchestration from the tool.
- Gives you a single place (`NeoSkillSyncManager`) to align behavior between tools and HTTP routes.
</issue_to_address>

Sourcery 对开源项目免费 —— 如果你觉得我们的评审有帮助,欢迎分享 ✨
帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English

Hey - I've found 4 issues, and left some high level feedback:

  • You added shipyard-neo-sdk as a direct reference in pyproject.toml but not in requirements.txt; if your deployment or dev workflows still rely on requirements.txt, consider adding the same dependency there to keep the two manifests consistent.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- You added `shipyard-neo-sdk` as a direct reference in `pyproject.toml` but not in `requirements.txt`; if your deployment or dev workflows still rely on `requirements.txt`, consider adding the same dependency there to keep the two manifests consistent.

## Individual Comments

### Comment 1
<location> `astrbot/dashboard/routes/skills.py:304-313` </location>
<code_context>
+            sync_json: dict[str, Any] | None = None
+            rollback_json: dict[str, Any] | None = None
+            if stage == "stable" and sync_to_local:
+                sync_mgr = NeoSkillSyncManager()
+                try:
+                    sync_result = await sync_mgr.sync_release(
</code_context>

<issue_to_address>
**suggestion (performance):** Stable promotion triggers skills sync twice (inside sync manager and again after), which is redundant.

When `stage == "stable" and sync_to_local`, `NeoSkillSyncManager().sync_release(...)` already triggers `sync_skills_to_active_sandboxes()`, and this route then calls `sync_skills_to_active_sandboxes()` again in the best-effort block. That means the same change is synced twice in a row. Consider relying on just one of these (either the manager’s call or the explicit call here) to avoid redundant work.

Suggested implementation:

```python
                sync_json = None
                did_sync_to_local = False
                if stage == "stable" and sync_to_local:
                    sync_mgr = NeoSkillSyncManager()
                    try:
                        sync_result = await sync_mgr.sync_release(
                            client,
                            release_id=str(release_json.get("id", "")),
                            require_stable=True,
                        )
                        did_sync_to_local = True
                        sync_json = {
                            "skill_key": sync_result.skill_key,
                            "local_skill_name": sync_result.local_skill_name,
                            "release_id": sync_result.release_id,
                            "candidate_id": sync_result.candidate_id,
                            "payload_ref": sync_result.payload_ref,

```

To fully avoid the double sync, you should also adjust the later "best-effort" sync call in this route:

1. Locate the block that currently calls `sync_skills_to_active_sandboxes()` (likely in a `try/except` "best-effort" section after the snippet you provided).
2. Wrap that call so it only runs when `did_sync_to_local` is `False`, for example:

   ```python
   if not did_sync_to_local:
       await sync_skills_to_active_sandboxes()
   ```

This way, when `stage == "stable" and sync_to_local` and `NeoSkillSyncManager().sync_release(...)` has already performed the sync, the route will skip the redundant second call.
</issue_to_address>

### Comment 2
<location> `astrbot/core/computer/booters/base.py:16-22` </location>
<code_context>
     @property
     def shell(self) -> ShellComponent: ...

+    @property
+    def browser(self) -> BrowserComponent:
+        raise NotImplementedError(
+            f"{self.__class__.__name__} does not support browser capability."
+        )
+
</code_context>

<issue_to_address>
**issue (bug_risk):** The base `browser` property raising `NotImplementedError` conflicts with `getattr(..., "browser", None)` checks in callers.

Because callers like `_get_browser_component` use `getattr(booter, "browser", None)` for capability detection, this property will raise instead of returning `None`, so the `if browser is None` path never executes. While the outer `try/except` masks the exception, it makes feature detection brittle and hides the clearer error message. Consider either returning `None` in the base implementation and overriding it in concrete booters, or updating callers to explicitly handle `NotImplementedError` when probing for `browser` support.
</issue_to_address>

### Comment 3
<location> `astrbot/dashboard/routes/skills.py:171` </location>
<code_context>
             logger.error(traceback.format_exc())
             return Response().error(str(e)).__dict__
+
+    async def get_neo_candidates(self):
+        try:
+            endpoint, access_token = self._get_neo_client_config()
</code_context>

<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for Neo client usage and sync result formatting to simplify the SkillsRoute handlers and remove duplication.

You can keep all behavior but reduce complexity/duplication inside `SkillsRoute` by factoring out the repeated “get config → open BayClient → call API → shape response” pattern and the repeated sync-result shaping.

### 1. Centralize BayClient construction and error handling

The Neo endpoints all do:

- `_get_neo_client_config`
- import `BayClient`
- `async with BayClient(...) as client: ...`
- try/except with identical logging + error response

You can move this into a small helper to remove repetition and keep each handler focused on *what* it does rather than *how* it talks to Neo:

```python
from shipyard_neo import BayClient  # move import to top-level

class SkillsRoute(Route):
    ...

    async def _with_neo_client(self, func):
        try:
            endpoint, access_token = self._get_neo_client_config()
            async with BayClient(
                endpoint_url=endpoint,
                access_token=access_token,
            ) as client:
                return await func(client)
        except Exception as e:
            logger.error(traceback.format_exc())
            return Response().error(str(e)).__dict__
```

Usage example (applies to `get_neo_candidates`, `get_neo_releases`, `get_neo_payload`, `evaluate_neo_candidate`, `promote_neo_candidate`, `rollback_neo_release`, `sync_neo_release`):

```python
async def get_neo_candidates(self):
    status = request.args.get("status")
    skill_key = request.args.get("skill_key")
    limit = int(request.args.get("limit", 100))
    offset = int(request.args.get("offset", 0))

    async def _do(client):
        candidates = await client.skills.list_candidates(
            status=status,
            skill_key=skill_key,
            limit=limit,
            offset=offset,
        )
        return Response().ok(_to_jsonable(candidates)).__dict__

    return await self._with_neo_client(_do)
```

This keeps route methods short and removes boilerplate.

### 2. Factor out the sync result shaping

`promote_neo_candidate` and `sync_neo_release` both build the same dict from `NeoSkillSyncResult`:

```python
{
    "skill_key": result.skill_key,
    "local_skill_name": result.local_skill_name,
    "release_id": result.release_id,
    "candidate_id": result.candidate_id,
    "payload_ref": result.payload_ref,
    "map_path": result.map_path,
    "synced_at": result.synced_at,
}
```

You can extract a tiny helper to avoid drift and keep both endpoints consistent:

```python
def _sync_result_to_dict(self, result):
    return {
        "skill_key": result.skill_key,
        "local_skill_name": result.local_skill_name,
        "release_id": result.release_id,
        "candidate_id": result.candidate_id,
        "payload_ref": result.payload_ref,
        "map_path": result.map_path,
        "synced_at": result.synced_at,
    }
```

Then:

```python
# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
    sync_result = await sync_mgr.sync_release(...)
    sync_json = self._sync_result_to_dict(sync_result)

# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__
```

These two small extractions reduce duplication and make the route methods materially simpler without changing any behavior or moving to a full-blown service layer.
</issue_to_address>

### Comment 4
<location> `astrbot/core/computer/tools/neo_skills.py:51` </location>
<code_context>
+    return browser
+
+
+@dataclass
+class BrowserExecTool(FunctionTool):
+    name: str = "astrbot_execute_browser"
</code_context>

<issue_to_address>
**issue (complexity):** Consider introducing a shared Neo tool base class and moving promote/sync/rollback orchestration into NeoSkillSyncManager to eliminate repeated admin/client/error/JSON boilerplate and centralize lifecycle logic.

You can reduce the complexity and duplication meaningfully without changing behavior by introducing a shared base and centralizing the promotion/sync/rollback orchestration.

### 1. Factor out a common Neo tool base

All tools repeat the same pattern:

- ` _ensure_admin`
- `_get_neo_context`
- try/except → call SDKJSON-ify or format error

You can keep behavior identical but move the plumbing into a base class so each tool only defines its parameters and the actual SDK call.

```python
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable

@dataclass
class NeoSkillToolBase(FunctionTool):
    error_prefix: str = "Error"

    async def _run(
        self,
        context: ContextWrapper[AstrAgentContext],
        neo_call: Callable[[Any, Any], Awaitable[Any]],
        error_action: str,
    ) -> ToolExecResult:
        if err := _ensure_admin(context):
            return err
        try:
            client, sandbox = await _get_neo_context(context)
            result = await neo_call(client, sandbox)
            return _to_json_text(result)
        except Exception as e:
            return f"{self.error_prefix} {error_action}: {str(e)}"
```

Then each tool becomes much smaller; for example, `ListSkillReleasesTool`:

```python
@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
    name: str = "astrbot_list_skill_releases"
    description: str = "List skill releases."
    parameters: dict = field(
        default_factory=lambda: {
            "type": "object",
            "properties": {
                "skill_key": {"type": "string"},
                "active_only": {"type": "boolean", "default": False},
                "stage": {"type": "string"},
                "limit": {"type": "integer", "default": 100},
                "offset": {"type": "integer", "default": 0},
            },
            "required": [],
        }
    )

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        skill_key: str | None = None,
        active_only: bool = False,
        stage: str | None = None,
        limit: int = 100,
        offset: int = 0,
    ) -> ToolExecResult:
        return await self._run(
            context,
            lambda client, _sandbox: client.skills.list_releases(
                skill_key=skill_key,
                active_only=active_only,
                stage=stage,
                limit=limit,
                offset=offset,
            ),
            error_action="listing skill releases",
        )
```

You can gradually migrate each tool to this pattern; it keeps the same signatures and behavior but removes repeated admin/client/error/JSON boilerplate.

### 2. Centralize promote/sync/rollback orchestration

`PromoteSkillCandidateTool` contains orchestration logic that’s already conceptually part of `NeoSkillSyncManager`/lifecycle concerns. You can move the flow into a helper function/service and keep the tool thin.

For example, add a method to `NeoSkillSyncManager` (or a small service next to it):

```python
class NeoSkillSyncManager:
    # existing methods...

    async def promote_with_optional_sync(
        self,
        client: Any,
        candidate_id: str,
        stage: str,
        sync_to_local: bool,
    ) -> dict[str, Any]:
        release = await client.skills.promote_candidate(candidate_id, stage=stage)
        release_json = _to_jsonable(release)

        sync_json: dict[str, Any] | None = None
        rollback_json: dict[str, Any] | None = None

        if stage == "stable" and sync_to_local:
            try:
                sync_result = await self.sync_release(
                    client,
                    release_id=str(release_json.get("id", "")),
                    require_stable=True,
                )
                sync_json = {
                    "skill_key": sync_result.skill_key,
                    "local_skill_name": sync_result.local_skill_name,
                    "release_id": sync_result.release_id,
                    "candidate_id": sync_result.candidate_id,
                    "payload_ref": sync_result.payload_ref,
                    "map_path": sync_result.map_path,
                    "synced_at": sync_result.synced_at,
                }
            except Exception as sync_err:
                try:
                    rollback = await client.skills.rollback_release(
                        str(release_json.get("id", ""))
                    )
                    rollback_json = _to_jsonable(rollback)
                except Exception as rollback_err:
                    raise RuntimeError(
                        "stable release sync failed and auto rollback also failed; "
                        f"sync_error={sync_err}; rollback_error={rollback_err}"
                    ) from rollback_err
                raise RuntimeError(
                    "stable release sync failed; auto rollback succeeded; "
                    f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
                ) from sync_err

        return {
            "release": release_json,
            "sync": sync_json,
            "rollback": rollback_json,
        }
```

Then `PromoteSkillCandidateTool.call` becomes:

```python
@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
    name: str = "astrbot_promote_skill_candidate"
    description: str = "Promote one candidate to release stage (canary/stable)."
    # parameters unchanged ...

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        candidate_id: str,
        stage: str = "canary",
        sync_to_local: bool = True,
    ) -> ToolExecResult:
        if stage not in {"canary", "stable"}:
            return "Error promoting skill candidate: stage must be canary or stable."

        if err := _ensure_admin(context):
            return err

        try:
            client, _sandbox = await _get_neo_context(context)
            sync_mgr = NeoSkillSyncManager()
            result = await sync_mgr.promote_with_optional_sync(
                client=client,
                candidate_id=candidate_id,
                stage=stage,
                sync_to_local=sync_to_local,
            )
            return _to_json_text(result)
        except Exception as e:
            return f"Error promoting skill candidate: {str(e)}"
```

This keeps all current behavior (including rollback semantics and error messages, if you preserve them in the manager) but:

- Removes duplicated orchestration from the tool.
- Gives you a single place (`NeoSkillSyncManager`) to align behavior between tools and HTTP routes.
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +304 to +313
sync_mgr = NeoSkillSyncManager()
try:
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (performance): 稳定版发布会触发两次技能同步(一次在同步管理器内部,一次在后续逻辑中),这是重复且多余的。

stage == "stable" and sync_to_local 时,NeoSkillSyncManager().sync_release(...) 本身就会触发 sync_skills_to_active_sandboxes(),而这个路由稍后在「best-effort」的代码块中又会再次调用 sync_skills_to_active_sandboxes()。这意味着同一个变更会连续同步两次。建议只保留其中一种方式(要么依赖管理器内部的调用,要么保留这里的显式调用),以避免重复工作。

建议实现方式:

                sync_json = None
                did_sync_to_local = False
                if stage == "stable" and sync_to_local:
                    sync_mgr = NeoSkillSyncManager()
                    try:
                        sync_result = await sync_mgr.sync_release(
                            client,
                            release_id=str(release_json.get("id", "")),
                            require_stable=True,
                        )
                        did_sync_to_local = True
                        sync_json = {
                            "skill_key": sync_result.skill_key,
                            "local_skill_name": sync_result.local_skill_name,
                            "release_id": sync_result.release_id,
                            "candidate_id": sync_result.candidate_id,
                            "payload_ref": sync_result.payload_ref,

为了彻底避免双重同步,你还应该调整路由后面那个「best-effort」的同步调用:

  1. 找到当前调用 sync_skills_to_active_sandboxes() 的代码块(很可能是在你给出的片段之后的一个 try/except 「best-effort」部分)。

  2. 将该调用包一层,使其仅在 did_sync_to_localFalse 时执行,例如:

    if not did_sync_to_local:
        await sync_skills_to_active_sandboxes()

这样,当 stage == "stable" and sync_to_localNeoSkillSyncManager().sync_release(...) 已经完成同步时,这个路由就会跳过多余的第二次调用。

Original comment in English

suggestion (performance): Stable promotion triggers skills sync twice (inside sync manager and again after), which is redundant.

When stage == "stable" and sync_to_local, NeoSkillSyncManager().sync_release(...) already triggers sync_skills_to_active_sandboxes(), and this route then calls sync_skills_to_active_sandboxes() again in the best-effort block. That means the same change is synced twice in a row. Consider relying on just one of these (either the manager’s call or the explicit call here) to avoid redundant work.

Suggested implementation:

                sync_json = None
                did_sync_to_local = False
                if stage == "stable" and sync_to_local:
                    sync_mgr = NeoSkillSyncManager()
                    try:
                        sync_result = await sync_mgr.sync_release(
                            client,
                            release_id=str(release_json.get("id", "")),
                            require_stable=True,
                        )
                        did_sync_to_local = True
                        sync_json = {
                            "skill_key": sync_result.skill_key,
                            "local_skill_name": sync_result.local_skill_name,
                            "release_id": sync_result.release_id,
                            "candidate_id": sync_result.candidate_id,
                            "payload_ref": sync_result.payload_ref,

To fully avoid the double sync, you should also adjust the later "best-effort" sync call in this route:

  1. Locate the block that currently calls sync_skills_to_active_sandboxes() (likely in a try/except "best-effort" section after the snippet you provided).

  2. Wrap that call so it only runs when did_sync_to_local is False, for example:

    if not did_sync_to_local:
        await sync_skills_to_active_sandboxes()

This way, when stage == "stable" and sync_to_local and NeoSkillSyncManager().sync_release(...) has already performed the sync, the route will skip the redundant second call.

Comment on lines 16 to +22
@property
def shell(self) -> ShellComponent: ...

@property
def browser(self) -> BrowserComponent:
raise NotImplementedError(
f"{self.__class__.__name__} does not support browser capability."
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (bug_risk): 基类中的 browser 属性抛出 NotImplementedError,与调用方使用 getattr(..., "browser", None) 的能力探测方式相冲突。

因为像 _get_browser_component 这样的调用方会通过 getattr(booter, "browser", None) 来做能力检测,这个属性会直接抛异常而不是返回 None,导致 if browser is None 分支永远不会执行。虽然外层的 try/except 会吞掉这个异常,但这会让能力检测变得脆弱,也隐藏了更清晰的错误信息。建议要么在基类实现中返回 None,并在具体 booter 中重写,要么在探测 browser 支持时,让调用方显式处理 NotImplementedError

Original comment in English

issue (bug_risk): The base browser property raising NotImplementedError conflicts with getattr(..., "browser", None) checks in callers.

Because callers like _get_browser_component use getattr(booter, "browser", None) for capability detection, this property will raise instead of returning None, so the if browser is None path never executes. While the outer try/except masks the exception, it makes feature detection brittle and hides the clearer error message. Consider either returning None in the base implementation and overriding it in concrete booters, or updating callers to explicitly handle NotImplementedError when probing for browser support.

logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__

async def get_neo_candidates(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议为 Neo 客户端的使用和同步结果的格式化提取通用辅助函数,以简化 SkillsRoute 的处理器并去掉重复代码。

你可以在保持所有行为不变的前提下,通过抽取重复的「获取配置 → 创建 BayClient → 调用 API → 整形响应」模式,以及重复的同步结果构造逻辑,来降低 SkillsRoute 内部的复杂度和重复。

1. 集中管理 BayClient 的构造与错误处理

所有 Neo 相关的端点基本都做了以下事情:

  • 调用 _get_neo_client_config
  • 导入 BayClient
  • async with BayClient(...) as client: ...
  • 使用相同的 try/except 进行日志记录和错误响应

你可以把这些逻辑移到一个小的辅助方法中,以去除重复,让每个 handler 更专注于做什么,而不是如何与 Neo 通信:

from shipyard_neo import BayClient  # move import to top-level

class SkillsRoute(Route):
    ...

    async def _with_neo_client(self, func):
        try:
            endpoint, access_token = self._get_neo_client_config()
            async with BayClient(
                endpoint_url=endpoint,
                access_token=access_token,
            ) as client:
                return await func(client)
        except Exception as e:
            logger.error(traceback.format_exc())
            return Response().error(str(e)).__dict__

使用示例(适用于 get_neo_candidates, get_neo_releases, get_neo_payload, evaluate_neo_candidate, promote_neo_candidate, rollback_neo_release, sync_neo_release):

async def get_neo_candidates(self):
    status = request.args.get("status")
    skill_key = request.args.get("skill_key")
    limit = int(request.args.get("limit", 100))
    offset = int(request.args.get("offset", 0))

    async def _do(client):
        candidates = await client.skills.list_candidates(
            status=status,
            skill_key=skill_key,
            limit=limit,
            offset=offset,
        )
        return Response().ok(_to_jsonable(candidates)).__dict__

    return await self._with_neo_client(_do)

这样可以让路由方法保持简短,并去掉样板代码。

2. 抽取同步结果的构造逻辑

promote_neo_candidatesync_neo_release 都会从 NeoSkillSyncResult 构造同样的字典:

{
    "skill_key": result.skill_key,
    "local_skill_name": result.local_skill_name,
    "release_id": result.release_id,
    "candidate_id": result.candidate_id,
    "payload_ref": result.payload_ref,
    "map_path": result.map_path,
    "synced_at": result.synced_at,
}

你可以抽出一个小的辅助函数来避免偏差,并保证两个端点的行为一致:

def _sync_result_to_dict(self, result):
    return {
        "skill_key": result.skill_key,
        "local_skill_name": result.local_skill_name,
        "release_id": result.release_id,
        "candidate_id": result.candidate_id,
        "payload_ref": result.payload_ref,
        "map_path": result.map_path,
        "synced_at": result.synced_at,
    }

然后:

# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
    sync_result = await sync_mgr.sync_release(...)
    sync_json = self._sync_result_to_dict(sync_result)

# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__

这两个小抽象就能减少重复代码,在不改变任何行为、也不引入完整服务层的情况下,让路由方法明显更简洁。

Original comment in English

issue (complexity): Consider extracting shared helpers for Neo client usage and sync result formatting to simplify the SkillsRoute handlers and remove duplication.

You can keep all behavior but reduce complexity/duplication inside SkillsRoute by factoring out the repeated “get config → open BayClient → call API → shape response” pattern and the repeated sync-result shaping.

1. Centralize BayClient construction and error handling

The Neo endpoints all do:

  • _get_neo_client_config
  • import BayClient
  • async with BayClient(...) as client: ...
  • try/except with identical logging + error response

You can move this into a small helper to remove repetition and keep each handler focused on what it does rather than how it talks to Neo:

from shipyard_neo import BayClient  # move import to top-level

class SkillsRoute(Route):
    ...

    async def _with_neo_client(self, func):
        try:
            endpoint, access_token = self._get_neo_client_config()
            async with BayClient(
                endpoint_url=endpoint,
                access_token=access_token,
            ) as client:
                return await func(client)
        except Exception as e:
            logger.error(traceback.format_exc())
            return Response().error(str(e)).__dict__

Usage example (applies to get_neo_candidates, get_neo_releases, get_neo_payload, evaluate_neo_candidate, promote_neo_candidate, rollback_neo_release, sync_neo_release):

async def get_neo_candidates(self):
    status = request.args.get("status")
    skill_key = request.args.get("skill_key")
    limit = int(request.args.get("limit", 100))
    offset = int(request.args.get("offset", 0))

    async def _do(client):
        candidates = await client.skills.list_candidates(
            status=status,
            skill_key=skill_key,
            limit=limit,
            offset=offset,
        )
        return Response().ok(_to_jsonable(candidates)).__dict__

    return await self._with_neo_client(_do)

This keeps route methods short and removes boilerplate.

2. Factor out the sync result shaping

promote_neo_candidate and sync_neo_release both build the same dict from NeoSkillSyncResult:

{
    "skill_key": result.skill_key,
    "local_skill_name": result.local_skill_name,
    "release_id": result.release_id,
    "candidate_id": result.candidate_id,
    "payload_ref": result.payload_ref,
    "map_path": result.map_path,
    "synced_at": result.synced_at,
}

You can extract a tiny helper to avoid drift and keep both endpoints consistent:

def _sync_result_to_dict(self, result):
    return {
        "skill_key": result.skill_key,
        "local_skill_name": result.local_skill_name,
        "release_id": result.release_id,
        "candidate_id": result.candidate_id,
        "payload_ref": result.payload_ref,
        "map_path": result.map_path,
        "synced_at": result.synced_at,
    }

Then:

# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
    sync_result = await sync_mgr.sync_release(...)
    sync_json = self._sync_result_to_dict(sync_result)

# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__

These two small extractions reduce duplication and make the route methods materially simpler without changing any behavior or moving to a full-blown service layer.

return client, sandbox


@dataclass
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

issue (complexity): 建议引入一个共享的 Neo 工具基类,并将 promote/sync/rollback 的编排逻辑挪到 NeoSkillSyncManager 中,从而去掉重复的管理员检查 / 客户端创建 / 错误处理 / JSON 序列化样板代码,并集中管理生命周期逻辑。

在不改变行为的前提下,你可以通过引入一个共享基类、并集中处理 promote/sync/rollback 的编排,大幅减少复杂度和重复代码。

1. 抽取通用的 Neo 工具基类

目前所有工具都在重复相同的模式:

  • 调用 _ensure_admin
  • 调用 _get_neo_context
  • try/except → 调用 SDK → 转成 JSON 或格式化错误

你可以保持行为完全一致,只是把这些管道逻辑移到基类中,使每个具体工具只负责定义参数和实际的 SDK 调用:

from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable

@dataclass
class NeoSkillToolBase(FunctionTool):
    error_prefix: str = "Error"

    async def _run(
        self,
        context: ContextWrapper[AstrAgentContext],
        neo_call: Callable[[Any, Any], Awaitable[Any]],
        error_action: str,
    ) -> ToolExecResult:
        if err := _ensure_admin(context):
            return err
        try:
            client, sandbox = await _get_neo_context(context)
            result = await neo_call(client, sandbox)
            return _to_json_text(result)
        except Exception as e:
            return f"{self.error_prefix} {error_action}: {str(e)}"

这样每个工具都可以大幅缩减;例如 ListSkillReleasesTool

@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
    name: str = "astrbot_list_skill_releases"
    description: str = "List skill releases."
    parameters: dict = field(
        default_factory=lambda: {
            "type": "object",
            "properties": {
                "skill_key": {"type": "string"},
                "active_only": {"type": "boolean", "default": False},
                "stage": {"type": "string"},
                "limit": {"type": "integer", "default": 100},
                "offset": {"type": "integer", "default": 0},
            },
            "required": [],
        }
    )

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        skill_key: str | None = None,
        active_only: bool = False,
        stage: str | None = None,
        limit: int = 100,
        offset: int = 0,
    ) -> ToolExecResult:
        return await self._run(
            context,
            lambda client, _sandbox: client.skills.list_releases(
                skill_key=skill_key,
                active_only=active_only,
                stage=stage,
                limit=limit,
                offset=offset,
            ),
            error_action="listing skill releases",
        )

你可以逐步将各个工具迁移到这种模式下;它保持相同的签名和行为,但去掉了重复的管理员检查 / 客户端创建 / 错误处理 / JSON 样板。

2. 集中管理 promote/sync/rollback 的编排

PromoteSkillCandidateTool 目前包含的编排逻辑,其实在概念上已经属于 NeoSkillSyncManager 或生命周期管理的范畴。你可以把这部分流程移动到一个辅助函数或服务中,让工具本身保持精简。

例如,在 NeoSkillSyncManager(或旁边的一个小服务)中新增一个方法:

class NeoSkillSyncManager:
    # existing methods...

    async def promote_with_optional_sync(
        self,
        client: Any,
        candidate_id: str,
        stage: str,
        sync_to_local: bool,
    ) -> dict[str, Any]:
        release = await client.skills.promote_candidate(candidate_id, stage=stage)
        release_json = _to_jsonable(release)

        sync_json: dict[str, Any] | None = None
        rollback_json: dict[str, Any] | None = None

        if stage == "stable" and sync_to_local:
            try:
                sync_result = await self.sync_release(
                    client,
                    release_id=str(release_json.get("id", "")),
                    require_stable=True,
                )
                sync_json = {
                    "skill_key": sync_result.skill_key,
                    "local_skill_name": sync_result.local_skill_name,
                    "release_id": sync_result.release_id,
                    "candidate_id": sync_result.candidate_id,
                    "payload_ref": sync_result.payload_ref,
                    "map_path": sync_result.map_path,
                    "synced_at": sync_result.synced_at,
                }
            except Exception as sync_err:
                try:
                    rollback = await client.skills.rollback_release(
                        str(release_json.get("id", ""))
                    )
                    rollback_json = _to_jsonable(rollback)
                except Exception as rollback_err:
                    raise RuntimeError(
                        "stable release sync failed and auto rollback also failed; "
                        f"sync_error={sync_err}; rollback_error={rollback_err}"
                    ) from rollback_err
                raise RuntimeError(
                    "stable release sync failed; auto rollback succeeded; "
                    f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
                ) from sync_err

        return {
            "release": release_json,
            "sync": sync_json,
            "rollback": rollback_json,
        }

然后 PromoteSkillCandidateTool.call 可以简化为:

@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
    name: str = "astrbot_promote_skill_candidate"
    description: str = "Promote one candidate to release stage (canary/stable)."
    # parameters unchanged ...

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        candidate_id: str,
        stage: str = "canary",
        sync_to_local: bool = True,
    ) -> ToolExecResult:
        if stage not in {"canary", "stable"}:
            return "Error promoting skill candidate: stage must be canary or stable."

        if err := _ensure_admin(context):
            return err

        try:
            client, _sandbox = await _get_neo_context(context)
            sync_mgr = NeoSkillSyncManager()
            result = await sync_mgr.promote_with_optional_sync(
                client=client,
                candidate_id=candidate_id,
                stage=stage,
                sync_to_local=sync_to_local,
            )
            return _to_json_text(result)
        except Exception as e:
            return f"Error promoting skill candidate: {str(e)}"

这样可以在保持当前所有行为不变的前提下(包括回滚语义和错误信息,只要你在管理器中保留它们):

  • 去掉工具内部重复的编排逻辑;
  • 提供单一位置(NeoSkillSyncManager)来统一工具和 HTTP 路由之间的行为。
Original comment in English

issue (complexity): Consider introducing a shared Neo tool base class and moving promote/sync/rollback orchestration into NeoSkillSyncManager to eliminate repeated admin/client/error/JSON boilerplate and centralize lifecycle logic.

You can reduce the complexity and duplication meaningfully without changing behavior by introducing a shared base and centralizing the promotion/sync/rollback orchestration.

1. Factor out a common Neo tool base

All tools repeat the same pattern:

  • _ensure_admin
  • _get_neo_context
  • try/except → call SDK → JSON-ify or format error

You can keep behavior identical but move the plumbing into a base class so each tool only defines its parameters and the actual SDK call.

from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable

@dataclass
class NeoSkillToolBase(FunctionTool):
    error_prefix: str = "Error"

    async def _run(
        self,
        context: ContextWrapper[AstrAgentContext],
        neo_call: Callable[[Any, Any], Awaitable[Any]],
        error_action: str,
    ) -> ToolExecResult:
        if err := _ensure_admin(context):
            return err
        try:
            client, sandbox = await _get_neo_context(context)
            result = await neo_call(client, sandbox)
            return _to_json_text(result)
        except Exception as e:
            return f"{self.error_prefix} {error_action}: {str(e)}"

Then each tool becomes much smaller; for example, ListSkillReleasesTool:

@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
    name: str = "astrbot_list_skill_releases"
    description: str = "List skill releases."
    parameters: dict = field(
        default_factory=lambda: {
            "type": "object",
            "properties": {
                "skill_key": {"type": "string"},
                "active_only": {"type": "boolean", "default": False},
                "stage": {"type": "string"},
                "limit": {"type": "integer", "default": 100},
                "offset": {"type": "integer", "default": 0},
            },
            "required": [],
        }
    )

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        skill_key: str | None = None,
        active_only: bool = False,
        stage: str | None = None,
        limit: int = 100,
        offset: int = 0,
    ) -> ToolExecResult:
        return await self._run(
            context,
            lambda client, _sandbox: client.skills.list_releases(
                skill_key=skill_key,
                active_only=active_only,
                stage=stage,
                limit=limit,
                offset=offset,
            ),
            error_action="listing skill releases",
        )

You can gradually migrate each tool to this pattern; it keeps the same signatures and behavior but removes repeated admin/client/error/JSON boilerplate.

2. Centralize promote/sync/rollback orchestration

PromoteSkillCandidateTool contains orchestration logic that’s already conceptually part of NeoSkillSyncManager/lifecycle concerns. You can move the flow into a helper function/service and keep the tool thin.

For example, add a method to NeoSkillSyncManager (or a small service next to it):

class NeoSkillSyncManager:
    # existing methods...

    async def promote_with_optional_sync(
        self,
        client: Any,
        candidate_id: str,
        stage: str,
        sync_to_local: bool,
    ) -> dict[str, Any]:
        release = await client.skills.promote_candidate(candidate_id, stage=stage)
        release_json = _to_jsonable(release)

        sync_json: dict[str, Any] | None = None
        rollback_json: dict[str, Any] | None = None

        if stage == "stable" and sync_to_local:
            try:
                sync_result = await self.sync_release(
                    client,
                    release_id=str(release_json.get("id", "")),
                    require_stable=True,
                )
                sync_json = {
                    "skill_key": sync_result.skill_key,
                    "local_skill_name": sync_result.local_skill_name,
                    "release_id": sync_result.release_id,
                    "candidate_id": sync_result.candidate_id,
                    "payload_ref": sync_result.payload_ref,
                    "map_path": sync_result.map_path,
                    "synced_at": sync_result.synced_at,
                }
            except Exception as sync_err:
                try:
                    rollback = await client.skills.rollback_release(
                        str(release_json.get("id", ""))
                    )
                    rollback_json = _to_jsonable(rollback)
                except Exception as rollback_err:
                    raise RuntimeError(
                        "stable release sync failed and auto rollback also failed; "
                        f"sync_error={sync_err}; rollback_error={rollback_err}"
                    ) from rollback_err
                raise RuntimeError(
                    "stable release sync failed; auto rollback succeeded; "
                    f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
                ) from sync_err

        return {
            "release": release_json,
            "sync": sync_json,
            "rollback": rollback_json,
        }

Then PromoteSkillCandidateTool.call becomes:

@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
    name: str = "astrbot_promote_skill_candidate"
    description: str = "Promote one candidate to release stage (canary/stable)."
    # parameters unchanged ...

    async def call(
        self,
        context: ContextWrapper[AstrAgentContext],
        candidate_id: str,
        stage: str = "canary",
        sync_to_local: bool = True,
    ) -> ToolExecResult:
        if stage not in {"canary", "stable"}:
            return "Error promoting skill candidate: stage must be canary or stable."

        if err := _ensure_admin(context):
            return err

        try:
            client, _sandbox = await _get_neo_context(context)
            sync_mgr = NeoSkillSyncManager()
            result = await sync_mgr.promote_with_optional_sync(
                client=client,
                candidate_id=candidate_id,
                stage=stage,
                sync_to_local=sync_to_local,
            )
            return _to_json_text(result)
        except Exception as e:
            return f"Error promoting skill candidate: {str(e)}"

This keeps all current behavior (including rollback semantics and error messages, if you preserve them in the manager) but:

  • Removes duplicated orchestration from the tool.
  • Gives you a single place (NeoSkillSyncManager) to align behavior between tools and HTTP routes.

@dosubot dosubot bot added area:core The bug / feature is about astrbot's core, backend area:webui The bug / feature is about webui(dashboard) of astrbot. labels Feb 11, 2026
@w31r4 w31r4 changed the title feat: 接入 Shipyard Neo 自迭代 Skill 闭环与管理能力 feat(skills): integrate Neo lifecycle and sandbox sync strategy Feb 12, 2026
@w31r4 w31r4 changed the title feat(skills): integrate Neo lifecycle and sandbox sync strategy feat: 接入 Shipyard Neo 自迭代 Skill 闭环与管理能力 Feb 12, 2026
@w31r4
Copy link
Author

w31r4 commented Feb 12, 2026

补充说明(仅针对最近一个提交):40c7cf39

本次修补主要解决“本地上传 skill 与 ship/gull 预置 skill 的共存与同步一致性”问题:

  1. 调整沙箱同步策略(astrbot/core/computer/computer_client.py
  • 从“整目录清空覆盖”改为“托管覆盖”。
  • 仅清理 AstrBot 自己托管的 skills,不再误删 ship/gull 预置 skills。
  • 同步后在沙箱侧扫描 skills/*/SKILL.md,回传技能元数据。
  1. 增加沙箱 skill 元数据缓存与合并展示(astrbot/core/skills/skill_manager.py
  • 新增 sandbox_skills_cache.json 缓存机制。
  • list_skills(runtime="sandbox") 合并“本地上传 skills + 沙箱预置 skills”,并保持本地同名优先。
  1. 本地 skill 变更后触发活跃沙箱同步(astrbot/dashboard/routes/skills.py
  • 上传/删除本地 skill 后,新增 best-effort sync_skills_to_active_sandboxes()
  1. 新增测试
  • tests/test_computer_skill_sync.py
  • tests/test_skill_manager_sandbox_cache.py

本提交已本地验证:

  • uv run ruff format .
  • uv run ruff check .
  • uv run pytest -q tests/test_skill_manager_sandbox_cache.py tests/test_computer_skill_sync.py tests/test_neo_skill_sync.py
  • uv run pytest -q tests/test_dashboard.py -k neo_skills_routes

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area:core The bug / feature is about astrbot's core, backend area:webui The bug / feature is about webui(dashboard) of astrbot. size:XXL This PR changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant