-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
feat: 接入 Shipyard Neo 自迭代 Skill 闭环与管理能力 #5028
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey - 我发现了 4 个问题,并给出了一些整体反馈:
- 你在
pyproject.toml中添加了对shipyard-neo-sdk的直接引用,但没有在requirements.txt中添加;如果你的部署或开发流程仍依赖requirements.txt,建议也在其中添加相同依赖,以保持两个清单文件的一致性。
给 AI Agent 的提示
Please address the comments from this code review:
## Overall Comments
- You added `shipyard-neo-sdk` as a direct reference in `pyproject.toml` but not in `requirements.txt`; if your deployment or dev workflows still rely on `requirements.txt`, consider adding the same dependency there to keep the two manifests consistent.
## Individual Comments
### Comment 1
<location> `astrbot/dashboard/routes/skills.py:304-313` </location>
<code_context>
+ sync_json: dict[str, Any] | None = None
+ rollback_json: dict[str, Any] | None = None
+ if stage == "stable" and sync_to_local:
+ sync_mgr = NeoSkillSyncManager()
+ try:
+ sync_result = await sync_mgr.sync_release(
</code_context>
<issue_to_address>
**suggestion (performance):** Stable promotion triggers skills sync twice (inside sync manager and again after), which is redundant.
When `stage == "stable" and sync_to_local`, `NeoSkillSyncManager().sync_release(...)` already triggers `sync_skills_to_active_sandboxes()`, and this route then calls `sync_skills_to_active_sandboxes()` again in the best-effort block. That means the same change is synced twice in a row. Consider relying on just one of these (either the manager’s call or the explicit call here) to avoid redundant work.
Suggested implementation:
```python
sync_json = None
did_sync_to_local = False
if stage == "stable" and sync_to_local:
sync_mgr = NeoSkillSyncManager()
try:
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
did_sync_to_local = True
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
```
To fully avoid the double sync, you should also adjust the later "best-effort" sync call in this route:
1. Locate the block that currently calls `sync_skills_to_active_sandboxes()` (likely in a `try/except` "best-effort" section after the snippet you provided).
2. Wrap that call so it only runs when `did_sync_to_local` is `False`, for example:
```python
if not did_sync_to_local:
await sync_skills_to_active_sandboxes()
```
This way, when `stage == "stable" and sync_to_local` and `NeoSkillSyncManager().sync_release(...)` has already performed the sync, the route will skip the redundant second call.
</issue_to_address>
### Comment 2
<location> `astrbot/core/computer/booters/base.py:16-22` </location>
<code_context>
@property
def shell(self) -> ShellComponent: ...
+ @property
+ def browser(self) -> BrowserComponent:
+ raise NotImplementedError(
+ f"{self.__class__.__name__} does not support browser capability."
+ )
+
</code_context>
<issue_to_address>
**issue (bug_risk):** The base `browser` property raising `NotImplementedError` conflicts with `getattr(..., "browser", None)` checks in callers.
Because callers like `_get_browser_component` use `getattr(booter, "browser", None)` for capability detection, this property will raise instead of returning `None`, so the `if browser is None` path never executes. While the outer `try/except` masks the exception, it makes feature detection brittle and hides the clearer error message. Consider either returning `None` in the base implementation and overriding it in concrete booters, or updating callers to explicitly handle `NotImplementedError` when probing for `browser` support.
</issue_to_address>
### Comment 3
<location> `astrbot/dashboard/routes/skills.py:171` </location>
<code_context>
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
+
+ async def get_neo_candidates(self):
+ try:
+ endpoint, access_token = self._get_neo_client_config()
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for Neo client usage and sync result formatting to simplify the SkillsRoute handlers and remove duplication.
You can keep all behavior but reduce complexity/duplication inside `SkillsRoute` by factoring out the repeated “get config → open BayClient → call API → shape response” pattern and the repeated sync-result shaping.
### 1. Centralize BayClient construction and error handling
The Neo endpoints all do:
- `_get_neo_client_config`
- import `BayClient`
- `async with BayClient(...) as client: ...`
- try/except with identical logging + error response
You can move this into a small helper to remove repetition and keep each handler focused on *what* it does rather than *how* it talks to Neo:
```python
from shipyard_neo import BayClient # move import to top-level
class SkillsRoute(Route):
...
async def _with_neo_client(self, func):
try:
endpoint, access_token = self._get_neo_client_config()
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
return await func(client)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
```
Usage example (applies to `get_neo_candidates`, `get_neo_releases`, `get_neo_payload`, `evaluate_neo_candidate`, `promote_neo_candidate`, `rollback_neo_release`, `sync_neo_release`):
```python
async def get_neo_candidates(self):
status = request.args.get("status")
skill_key = request.args.get("skill_key")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
async def _do(client):
candidates = await client.skills.list_candidates(
status=status,
skill_key=skill_key,
limit=limit,
offset=offset,
)
return Response().ok(_to_jsonable(candidates)).__dict__
return await self._with_neo_client(_do)
```
This keeps route methods short and removes boilerplate.
### 2. Factor out the sync result shaping
`promote_neo_candidate` and `sync_neo_release` both build the same dict from `NeoSkillSyncResult`:
```python
{
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}
```
You can extract a tiny helper to avoid drift and keep both endpoints consistent:
```python
def _sync_result_to_dict(self, result):
return {
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}
```
Then:
```python
# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
sync_result = await sync_mgr.sync_release(...)
sync_json = self._sync_result_to_dict(sync_result)
# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__
```
These two small extractions reduce duplication and make the route methods materially simpler without changing any behavior or moving to a full-blown service layer.
</issue_to_address>
### Comment 4
<location> `astrbot/core/computer/tools/neo_skills.py:51` </location>
<code_context>
+ return browser
+
+
+@dataclass
+class BrowserExecTool(FunctionTool):
+ name: str = "astrbot_execute_browser"
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing a shared Neo tool base class and moving promote/sync/rollback orchestration into NeoSkillSyncManager to eliminate repeated admin/client/error/JSON boilerplate and centralize lifecycle logic.
You can reduce the complexity and duplication meaningfully without changing behavior by introducing a shared base and centralizing the promotion/sync/rollback orchestration.
### 1. Factor out a common Neo tool base
All tools repeat the same pattern:
- ` _ensure_admin`
- `_get_neo_context`
- try/except → call SDK → JSON-ify or format error
You can keep behavior identical but move the plumbing into a base class so each tool only defines its parameters and the actual SDK call.
```python
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
@dataclass
class NeoSkillToolBase(FunctionTool):
error_prefix: str = "Error"
async def _run(
self,
context: ContextWrapper[AstrAgentContext],
neo_call: Callable[[Any, Any], Awaitable[Any]],
error_action: str,
) -> ToolExecResult:
if err := _ensure_admin(context):
return err
try:
client, sandbox = await _get_neo_context(context)
result = await neo_call(client, sandbox)
return _to_json_text(result)
except Exception as e:
return f"{self.error_prefix} {error_action}: {str(e)}"
```
Then each tool becomes much smaller; for example, `ListSkillReleasesTool`:
```python
@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
name: str = "astrbot_list_skill_releases"
description: str = "List skill releases."
parameters: dict = field(
default_factory=lambda: {
"type": "object",
"properties": {
"skill_key": {"type": "string"},
"active_only": {"type": "boolean", "default": False},
"stage": {"type": "string"},
"limit": {"type": "integer", "default": 100},
"offset": {"type": "integer", "default": 0},
},
"required": [],
}
)
async def call(
self,
context: ContextWrapper[AstrAgentContext],
skill_key: str | None = None,
active_only: bool = False,
stage: str | None = None,
limit: int = 100,
offset: int = 0,
) -> ToolExecResult:
return await self._run(
context,
lambda client, _sandbox: client.skills.list_releases(
skill_key=skill_key,
active_only=active_only,
stage=stage,
limit=limit,
offset=offset,
),
error_action="listing skill releases",
)
```
You can gradually migrate each tool to this pattern; it keeps the same signatures and behavior but removes repeated admin/client/error/JSON boilerplate.
### 2. Centralize promote/sync/rollback orchestration
`PromoteSkillCandidateTool` contains orchestration logic that’s already conceptually part of `NeoSkillSyncManager`/lifecycle concerns. You can move the flow into a helper function/service and keep the tool thin.
For example, add a method to `NeoSkillSyncManager` (or a small service next to it):
```python
class NeoSkillSyncManager:
# existing methods...
async def promote_with_optional_sync(
self,
client: Any,
candidate_id: str,
stage: str,
sync_to_local: bool,
) -> dict[str, Any]:
release = await client.skills.promote_candidate(candidate_id, stage=stage)
release_json = _to_jsonable(release)
sync_json: dict[str, Any] | None = None
rollback_json: dict[str, Any] | None = None
if stage == "stable" and sync_to_local:
try:
sync_result = await self.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
"map_path": sync_result.map_path,
"synced_at": sync_result.synced_at,
}
except Exception as sync_err:
try:
rollback = await client.skills.rollback_release(
str(release_json.get("id", ""))
)
rollback_json = _to_jsonable(rollback)
except Exception as rollback_err:
raise RuntimeError(
"stable release sync failed and auto rollback also failed; "
f"sync_error={sync_err}; rollback_error={rollback_err}"
) from rollback_err
raise RuntimeError(
"stable release sync failed; auto rollback succeeded; "
f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
) from sync_err
return {
"release": release_json,
"sync": sync_json,
"rollback": rollback_json,
}
```
Then `PromoteSkillCandidateTool.call` becomes:
```python
@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
name: str = "astrbot_promote_skill_candidate"
description: str = "Promote one candidate to release stage (canary/stable)."
# parameters unchanged ...
async def call(
self,
context: ContextWrapper[AstrAgentContext],
candidate_id: str,
stage: str = "canary",
sync_to_local: bool = True,
) -> ToolExecResult:
if stage not in {"canary", "stable"}:
return "Error promoting skill candidate: stage must be canary or stable."
if err := _ensure_admin(context):
return err
try:
client, _sandbox = await _get_neo_context(context)
sync_mgr = NeoSkillSyncManager()
result = await sync_mgr.promote_with_optional_sync(
client=client,
candidate_id=candidate_id,
stage=stage,
sync_to_local=sync_to_local,
)
return _to_json_text(result)
except Exception as e:
return f"Error promoting skill candidate: {str(e)}"
```
This keeps all current behavior (including rollback semantics and error messages, if you preserve them in the manager) but:
- Removes duplicated orchestration from the tool.
- Gives you a single place (`NeoSkillSyncManager`) to align behavior between tools and HTTP routes.
</issue_to_address>帮我变得更有用!请在每条评论上点 👍 或 👎,我会根据你的反馈改进后续的评审。
Original comment in English
Hey - I've found 4 issues, and left some high level feedback:
- You added
shipyard-neo-sdkas a direct reference inpyproject.tomlbut not inrequirements.txt; if your deployment or dev workflows still rely onrequirements.txt, consider adding the same dependency there to keep the two manifests consistent.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- You added `shipyard-neo-sdk` as a direct reference in `pyproject.toml` but not in `requirements.txt`; if your deployment or dev workflows still rely on `requirements.txt`, consider adding the same dependency there to keep the two manifests consistent.
## Individual Comments
### Comment 1
<location> `astrbot/dashboard/routes/skills.py:304-313` </location>
<code_context>
+ sync_json: dict[str, Any] | None = None
+ rollback_json: dict[str, Any] | None = None
+ if stage == "stable" and sync_to_local:
+ sync_mgr = NeoSkillSyncManager()
+ try:
+ sync_result = await sync_mgr.sync_release(
</code_context>
<issue_to_address>
**suggestion (performance):** Stable promotion triggers skills sync twice (inside sync manager and again after), which is redundant.
When `stage == "stable" and sync_to_local`, `NeoSkillSyncManager().sync_release(...)` already triggers `sync_skills_to_active_sandboxes()`, and this route then calls `sync_skills_to_active_sandboxes()` again in the best-effort block. That means the same change is synced twice in a row. Consider relying on just one of these (either the manager’s call or the explicit call here) to avoid redundant work.
Suggested implementation:
```python
sync_json = None
did_sync_to_local = False
if stage == "stable" and sync_to_local:
sync_mgr = NeoSkillSyncManager()
try:
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
did_sync_to_local = True
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
```
To fully avoid the double sync, you should also adjust the later "best-effort" sync call in this route:
1. Locate the block that currently calls `sync_skills_to_active_sandboxes()` (likely in a `try/except` "best-effort" section after the snippet you provided).
2. Wrap that call so it only runs when `did_sync_to_local` is `False`, for example:
```python
if not did_sync_to_local:
await sync_skills_to_active_sandboxes()
```
This way, when `stage == "stable" and sync_to_local` and `NeoSkillSyncManager().sync_release(...)` has already performed the sync, the route will skip the redundant second call.
</issue_to_address>
### Comment 2
<location> `astrbot/core/computer/booters/base.py:16-22` </location>
<code_context>
@property
def shell(self) -> ShellComponent: ...
+ @property
+ def browser(self) -> BrowserComponent:
+ raise NotImplementedError(
+ f"{self.__class__.__name__} does not support browser capability."
+ )
+
</code_context>
<issue_to_address>
**issue (bug_risk):** The base `browser` property raising `NotImplementedError` conflicts with `getattr(..., "browser", None)` checks in callers.
Because callers like `_get_browser_component` use `getattr(booter, "browser", None)` for capability detection, this property will raise instead of returning `None`, so the `if browser is None` path never executes. While the outer `try/except` masks the exception, it makes feature detection brittle and hides the clearer error message. Consider either returning `None` in the base implementation and overriding it in concrete booters, or updating callers to explicitly handle `NotImplementedError` when probing for `browser` support.
</issue_to_address>
### Comment 3
<location> `astrbot/dashboard/routes/skills.py:171` </location>
<code_context>
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
+
+ async def get_neo_candidates(self):
+ try:
+ endpoint, access_token = self._get_neo_client_config()
</code_context>
<issue_to_address>
**issue (complexity):** Consider extracting shared helpers for Neo client usage and sync result formatting to simplify the SkillsRoute handlers and remove duplication.
You can keep all behavior but reduce complexity/duplication inside `SkillsRoute` by factoring out the repeated “get config → open BayClient → call API → shape response” pattern and the repeated sync-result shaping.
### 1. Centralize BayClient construction and error handling
The Neo endpoints all do:
- `_get_neo_client_config`
- import `BayClient`
- `async with BayClient(...) as client: ...`
- try/except with identical logging + error response
You can move this into a small helper to remove repetition and keep each handler focused on *what* it does rather than *how* it talks to Neo:
```python
from shipyard_neo import BayClient # move import to top-level
class SkillsRoute(Route):
...
async def _with_neo_client(self, func):
try:
endpoint, access_token = self._get_neo_client_config()
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
return await func(client)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
```
Usage example (applies to `get_neo_candidates`, `get_neo_releases`, `get_neo_payload`, `evaluate_neo_candidate`, `promote_neo_candidate`, `rollback_neo_release`, `sync_neo_release`):
```python
async def get_neo_candidates(self):
status = request.args.get("status")
skill_key = request.args.get("skill_key")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
async def _do(client):
candidates = await client.skills.list_candidates(
status=status,
skill_key=skill_key,
limit=limit,
offset=offset,
)
return Response().ok(_to_jsonable(candidates)).__dict__
return await self._with_neo_client(_do)
```
This keeps route methods short and removes boilerplate.
### 2. Factor out the sync result shaping
`promote_neo_candidate` and `sync_neo_release` both build the same dict from `NeoSkillSyncResult`:
```python
{
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}
```
You can extract a tiny helper to avoid drift and keep both endpoints consistent:
```python
def _sync_result_to_dict(self, result):
return {
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}
```
Then:
```python
# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
sync_result = await sync_mgr.sync_release(...)
sync_json = self._sync_result_to_dict(sync_result)
# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__
```
These two small extractions reduce duplication and make the route methods materially simpler without changing any behavior or moving to a full-blown service layer.
</issue_to_address>
### Comment 4
<location> `astrbot/core/computer/tools/neo_skills.py:51` </location>
<code_context>
+ return browser
+
+
+@dataclass
+class BrowserExecTool(FunctionTool):
+ name: str = "astrbot_execute_browser"
</code_context>
<issue_to_address>
**issue (complexity):** Consider introducing a shared Neo tool base class and moving promote/sync/rollback orchestration into NeoSkillSyncManager to eliminate repeated admin/client/error/JSON boilerplate and centralize lifecycle logic.
You can reduce the complexity and duplication meaningfully without changing behavior by introducing a shared base and centralizing the promotion/sync/rollback orchestration.
### 1. Factor out a common Neo tool base
All tools repeat the same pattern:
- ` _ensure_admin`
- `_get_neo_context`
- try/except → call SDK → JSON-ify or format error
You can keep behavior identical but move the plumbing into a base class so each tool only defines its parameters and the actual SDK call.
```python
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
@dataclass
class NeoSkillToolBase(FunctionTool):
error_prefix: str = "Error"
async def _run(
self,
context: ContextWrapper[AstrAgentContext],
neo_call: Callable[[Any, Any], Awaitable[Any]],
error_action: str,
) -> ToolExecResult:
if err := _ensure_admin(context):
return err
try:
client, sandbox = await _get_neo_context(context)
result = await neo_call(client, sandbox)
return _to_json_text(result)
except Exception as e:
return f"{self.error_prefix} {error_action}: {str(e)}"
```
Then each tool becomes much smaller; for example, `ListSkillReleasesTool`:
```python
@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
name: str = "astrbot_list_skill_releases"
description: str = "List skill releases."
parameters: dict = field(
default_factory=lambda: {
"type": "object",
"properties": {
"skill_key": {"type": "string"},
"active_only": {"type": "boolean", "default": False},
"stage": {"type": "string"},
"limit": {"type": "integer", "default": 100},
"offset": {"type": "integer", "default": 0},
},
"required": [],
}
)
async def call(
self,
context: ContextWrapper[AstrAgentContext],
skill_key: str | None = None,
active_only: bool = False,
stage: str | None = None,
limit: int = 100,
offset: int = 0,
) -> ToolExecResult:
return await self._run(
context,
lambda client, _sandbox: client.skills.list_releases(
skill_key=skill_key,
active_only=active_only,
stage=stage,
limit=limit,
offset=offset,
),
error_action="listing skill releases",
)
```
You can gradually migrate each tool to this pattern; it keeps the same signatures and behavior but removes repeated admin/client/error/JSON boilerplate.
### 2. Centralize promote/sync/rollback orchestration
`PromoteSkillCandidateTool` contains orchestration logic that’s already conceptually part of `NeoSkillSyncManager`/lifecycle concerns. You can move the flow into a helper function/service and keep the tool thin.
For example, add a method to `NeoSkillSyncManager` (or a small service next to it):
```python
class NeoSkillSyncManager:
# existing methods...
async def promote_with_optional_sync(
self,
client: Any,
candidate_id: str,
stage: str,
sync_to_local: bool,
) -> dict[str, Any]:
release = await client.skills.promote_candidate(candidate_id, stage=stage)
release_json = _to_jsonable(release)
sync_json: dict[str, Any] | None = None
rollback_json: dict[str, Any] | None = None
if stage == "stable" and sync_to_local:
try:
sync_result = await self.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
"map_path": sync_result.map_path,
"synced_at": sync_result.synced_at,
}
except Exception as sync_err:
try:
rollback = await client.skills.rollback_release(
str(release_json.get("id", ""))
)
rollback_json = _to_jsonable(rollback)
except Exception as rollback_err:
raise RuntimeError(
"stable release sync failed and auto rollback also failed; "
f"sync_error={sync_err}; rollback_error={rollback_err}"
) from rollback_err
raise RuntimeError(
"stable release sync failed; auto rollback succeeded; "
f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
) from sync_err
return {
"release": release_json,
"sync": sync_json,
"rollback": rollback_json,
}
```
Then `PromoteSkillCandidateTool.call` becomes:
```python
@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
name: str = "astrbot_promote_skill_candidate"
description: str = "Promote one candidate to release stage (canary/stable)."
# parameters unchanged ...
async def call(
self,
context: ContextWrapper[AstrAgentContext],
candidate_id: str,
stage: str = "canary",
sync_to_local: bool = True,
) -> ToolExecResult:
if stage not in {"canary", "stable"}:
return "Error promoting skill candidate: stage must be canary or stable."
if err := _ensure_admin(context):
return err
try:
client, _sandbox = await _get_neo_context(context)
sync_mgr = NeoSkillSyncManager()
result = await sync_mgr.promote_with_optional_sync(
client=client,
candidate_id=candidate_id,
stage=stage,
sync_to_local=sync_to_local,
)
return _to_json_text(result)
except Exception as e:
return f"Error promoting skill candidate: {str(e)}"
```
This keeps all current behavior (including rollback semantics and error messages, if you preserve them in the manager) but:
- Removes duplicated orchestration from the tool.
- Gives you a single place (`NeoSkillSyncManager`) to align behavior between tools and HTTP routes.
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| sync_mgr = NeoSkillSyncManager() | ||
| try: | ||
| sync_result = await sync_mgr.sync_release( | ||
| client, | ||
| release_id=str(release_json.get("id", "")), | ||
| require_stable=True, | ||
| ) | ||
| sync_json = { | ||
| "skill_key": sync_result.skill_key, | ||
| "local_skill_name": sync_result.local_skill_name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion (performance): 稳定版发布会触发两次技能同步(一次在同步管理器内部,一次在后续逻辑中),这是重复且多余的。
当 stage == "stable" and sync_to_local 时,NeoSkillSyncManager().sync_release(...) 本身就会触发 sync_skills_to_active_sandboxes(),而这个路由稍后在「best-effort」的代码块中又会再次调用 sync_skills_to_active_sandboxes()。这意味着同一个变更会连续同步两次。建议只保留其中一种方式(要么依赖管理器内部的调用,要么保留这里的显式调用),以避免重复工作。
建议实现方式:
sync_json = None
did_sync_to_local = False
if stage == "stable" and sync_to_local:
sync_mgr = NeoSkillSyncManager()
try:
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
did_sync_to_local = True
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,为了彻底避免双重同步,你还应该调整路由后面那个「best-effort」的同步调用:
-
找到当前调用
sync_skills_to_active_sandboxes()的代码块(很可能是在你给出的片段之后的一个try/except「best-effort」部分)。 -
将该调用包一层,使其仅在
did_sync_to_local为False时执行,例如:if not did_sync_to_local: await sync_skills_to_active_sandboxes()
这样,当 stage == "stable" and sync_to_local 且 NeoSkillSyncManager().sync_release(...) 已经完成同步时,这个路由就会跳过多余的第二次调用。
Original comment in English
suggestion (performance): Stable promotion triggers skills sync twice (inside sync manager and again after), which is redundant.
When stage == "stable" and sync_to_local, NeoSkillSyncManager().sync_release(...) already triggers sync_skills_to_active_sandboxes(), and this route then calls sync_skills_to_active_sandboxes() again in the best-effort block. That means the same change is synced twice in a row. Consider relying on just one of these (either the manager’s call or the explicit call here) to avoid redundant work.
Suggested implementation:
sync_json = None
did_sync_to_local = False
if stage == "stable" and sync_to_local:
sync_mgr = NeoSkillSyncManager()
try:
sync_result = await sync_mgr.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
did_sync_to_local = True
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,To fully avoid the double sync, you should also adjust the later "best-effort" sync call in this route:
-
Locate the block that currently calls
sync_skills_to_active_sandboxes()(likely in atry/except"best-effort" section after the snippet you provided). -
Wrap that call so it only runs when
did_sync_to_localisFalse, for example:if not did_sync_to_local: await sync_skills_to_active_sandboxes()
This way, when stage == "stable" and sync_to_local and NeoSkillSyncManager().sync_release(...) has already performed the sync, the route will skip the redundant second call.
| @property | ||
| def shell(self) -> ShellComponent: ... | ||
|
|
||
| @property | ||
| def browser(self) -> BrowserComponent: | ||
| raise NotImplementedError( | ||
| f"{self.__class__.__name__} does not support browser capability." |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (bug_risk): 基类中的 browser 属性抛出 NotImplementedError,与调用方使用 getattr(..., "browser", None) 的能力探测方式相冲突。
因为像 _get_browser_component 这样的调用方会通过 getattr(booter, "browser", None) 来做能力检测,这个属性会直接抛异常而不是返回 None,导致 if browser is None 分支永远不会执行。虽然外层的 try/except 会吞掉这个异常,但这会让能力检测变得脆弱,也隐藏了更清晰的错误信息。建议要么在基类实现中返回 None,并在具体 booter 中重写,要么在探测 browser 支持时,让调用方显式处理 NotImplementedError。
Original comment in English
issue (bug_risk): The base browser property raising NotImplementedError conflicts with getattr(..., "browser", None) checks in callers.
Because callers like _get_browser_component use getattr(booter, "browser", None) for capability detection, this property will raise instead of returning None, so the if browser is None path never executes. While the outer try/except masks the exception, it makes feature detection brittle and hides the clearer error message. Consider either returning None in the base implementation and overriding it in concrete booters, or updating callers to explicitly handle NotImplementedError when probing for browser support.
| logger.error(traceback.format_exc()) | ||
| return Response().error(str(e)).__dict__ | ||
|
|
||
| async def get_neo_candidates(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议为 Neo 客户端的使用和同步结果的格式化提取通用辅助函数,以简化 SkillsRoute 的处理器并去掉重复代码。
你可以在保持所有行为不变的前提下,通过抽取重复的「获取配置 → 创建 BayClient → 调用 API → 整形响应」模式,以及重复的同步结果构造逻辑,来降低 SkillsRoute 内部的复杂度和重复。
1. 集中管理 BayClient 的构造与错误处理
所有 Neo 相关的端点基本都做了以下事情:
- 调用
_get_neo_client_config - 导入
BayClient async with BayClient(...) as client: ...- 使用相同的 try/except 进行日志记录和错误响应
你可以把这些逻辑移到一个小的辅助方法中,以去除重复,让每个 handler 更专注于做什么,而不是如何与 Neo 通信:
from shipyard_neo import BayClient # move import to top-level
class SkillsRoute(Route):
...
async def _with_neo_client(self, func):
try:
endpoint, access_token = self._get_neo_client_config()
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
return await func(client)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__使用示例(适用于 get_neo_candidates, get_neo_releases, get_neo_payload, evaluate_neo_candidate, promote_neo_candidate, rollback_neo_release, sync_neo_release):
async def get_neo_candidates(self):
status = request.args.get("status")
skill_key = request.args.get("skill_key")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
async def _do(client):
candidates = await client.skills.list_candidates(
status=status,
skill_key=skill_key,
limit=limit,
offset=offset,
)
return Response().ok(_to_jsonable(candidates)).__dict__
return await self._with_neo_client(_do)这样可以让路由方法保持简短,并去掉样板代码。
2. 抽取同步结果的构造逻辑
promote_neo_candidate 和 sync_neo_release 都会从 NeoSkillSyncResult 构造同样的字典:
{
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}你可以抽出一个小的辅助函数来避免偏差,并保证两个端点的行为一致:
def _sync_result_to_dict(self, result):
return {
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}然后:
# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
sync_result = await sync_mgr.sync_release(...)
sync_json = self._sync_result_to_dict(sync_result)
# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__这两个小抽象就能减少重复代码,在不改变任何行为、也不引入完整服务层的情况下,让路由方法明显更简洁。
Original comment in English
issue (complexity): Consider extracting shared helpers for Neo client usage and sync result formatting to simplify the SkillsRoute handlers and remove duplication.
You can keep all behavior but reduce complexity/duplication inside SkillsRoute by factoring out the repeated “get config → open BayClient → call API → shape response” pattern and the repeated sync-result shaping.
1. Centralize BayClient construction and error handling
The Neo endpoints all do:
_get_neo_client_config- import
BayClient async with BayClient(...) as client: ...- try/except with identical logging + error response
You can move this into a small helper to remove repetition and keep each handler focused on what it does rather than how it talks to Neo:
from shipyard_neo import BayClient # move import to top-level
class SkillsRoute(Route):
...
async def _with_neo_client(self, func):
try:
endpoint, access_token = self._get_neo_client_config()
async with BayClient(
endpoint_url=endpoint,
access_token=access_token,
) as client:
return await func(client)
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__Usage example (applies to get_neo_candidates, get_neo_releases, get_neo_payload, evaluate_neo_candidate, promote_neo_candidate, rollback_neo_release, sync_neo_release):
async def get_neo_candidates(self):
status = request.args.get("status")
skill_key = request.args.get("skill_key")
limit = int(request.args.get("limit", 100))
offset = int(request.args.get("offset", 0))
async def _do(client):
candidates = await client.skills.list_candidates(
status=status,
skill_key=skill_key,
limit=limit,
offset=offset,
)
return Response().ok(_to_jsonable(candidates)).__dict__
return await self._with_neo_client(_do)This keeps route methods short and removes boilerplate.
2. Factor out the sync result shaping
promote_neo_candidate and sync_neo_release both build the same dict from NeoSkillSyncResult:
{
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}You can extract a tiny helper to avoid drift and keep both endpoints consistent:
def _sync_result_to_dict(self, result):
return {
"skill_key": result.skill_key,
"local_skill_name": result.local_skill_name,
"release_id": result.release_id,
"candidate_id": result.candidate_id,
"payload_ref": result.payload_ref,
"map_path": result.map_path,
"synced_at": result.synced_at,
}Then:
# in promote_neo_candidate
sync_json = None
if stage == "stable" and sync_to_local:
sync_result = await sync_mgr.sync_release(...)
sync_json = self._sync_result_to_dict(sync_result)
# in sync_neo_release
result = await sync_mgr.sync_release(...)
return Response().ok(self._sync_result_to_dict(result)).__dict__These two small extractions reduce duplication and make the route methods materially simpler without changing any behavior or moving to a full-blown service layer.
| return client, sandbox | ||
|
|
||
|
|
||
| @dataclass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
issue (complexity): 建议引入一个共享的 Neo 工具基类,并将 promote/sync/rollback 的编排逻辑挪到 NeoSkillSyncManager 中,从而去掉重复的管理员检查 / 客户端创建 / 错误处理 / JSON 序列化样板代码,并集中管理生命周期逻辑。
在不改变行为的前提下,你可以通过引入一个共享基类、并集中处理 promote/sync/rollback 的编排,大幅减少复杂度和重复代码。
1. 抽取通用的 Neo 工具基类
目前所有工具都在重复相同的模式:
- 调用
_ensure_admin - 调用
_get_neo_context - try/except → 调用 SDK → 转成 JSON 或格式化错误
你可以保持行为完全一致,只是把这些管道逻辑移到基类中,使每个具体工具只负责定义参数和实际的 SDK 调用:
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
@dataclass
class NeoSkillToolBase(FunctionTool):
error_prefix: str = "Error"
async def _run(
self,
context: ContextWrapper[AstrAgentContext],
neo_call: Callable[[Any, Any], Awaitable[Any]],
error_action: str,
) -> ToolExecResult:
if err := _ensure_admin(context):
return err
try:
client, sandbox = await _get_neo_context(context)
result = await neo_call(client, sandbox)
return _to_json_text(result)
except Exception as e:
return f"{self.error_prefix} {error_action}: {str(e)}"这样每个工具都可以大幅缩减;例如 ListSkillReleasesTool:
@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
name: str = "astrbot_list_skill_releases"
description: str = "List skill releases."
parameters: dict = field(
default_factory=lambda: {
"type": "object",
"properties": {
"skill_key": {"type": "string"},
"active_only": {"type": "boolean", "default": False},
"stage": {"type": "string"},
"limit": {"type": "integer", "default": 100},
"offset": {"type": "integer", "default": 0},
},
"required": [],
}
)
async def call(
self,
context: ContextWrapper[AstrAgentContext],
skill_key: str | None = None,
active_only: bool = False,
stage: str | None = None,
limit: int = 100,
offset: int = 0,
) -> ToolExecResult:
return await self._run(
context,
lambda client, _sandbox: client.skills.list_releases(
skill_key=skill_key,
active_only=active_only,
stage=stage,
limit=limit,
offset=offset,
),
error_action="listing skill releases",
)你可以逐步将各个工具迁移到这种模式下;它保持相同的签名和行为,但去掉了重复的管理员检查 / 客户端创建 / 错误处理 / JSON 样板。
2. 集中管理 promote/sync/rollback 的编排
PromoteSkillCandidateTool 目前包含的编排逻辑,其实在概念上已经属于 NeoSkillSyncManager 或生命周期管理的范畴。你可以把这部分流程移动到一个辅助函数或服务中,让工具本身保持精简。
例如,在 NeoSkillSyncManager(或旁边的一个小服务)中新增一个方法:
class NeoSkillSyncManager:
# existing methods...
async def promote_with_optional_sync(
self,
client: Any,
candidate_id: str,
stage: str,
sync_to_local: bool,
) -> dict[str, Any]:
release = await client.skills.promote_candidate(candidate_id, stage=stage)
release_json = _to_jsonable(release)
sync_json: dict[str, Any] | None = None
rollback_json: dict[str, Any] | None = None
if stage == "stable" and sync_to_local:
try:
sync_result = await self.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
"map_path": sync_result.map_path,
"synced_at": sync_result.synced_at,
}
except Exception as sync_err:
try:
rollback = await client.skills.rollback_release(
str(release_json.get("id", ""))
)
rollback_json = _to_jsonable(rollback)
except Exception as rollback_err:
raise RuntimeError(
"stable release sync failed and auto rollback also failed; "
f"sync_error={sync_err}; rollback_error={rollback_err}"
) from rollback_err
raise RuntimeError(
"stable release sync failed; auto rollback succeeded; "
f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
) from sync_err
return {
"release": release_json,
"sync": sync_json,
"rollback": rollback_json,
}然后 PromoteSkillCandidateTool.call 可以简化为:
@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
name: str = "astrbot_promote_skill_candidate"
description: str = "Promote one candidate to release stage (canary/stable)."
# parameters unchanged ...
async def call(
self,
context: ContextWrapper[AstrAgentContext],
candidate_id: str,
stage: str = "canary",
sync_to_local: bool = True,
) -> ToolExecResult:
if stage not in {"canary", "stable"}:
return "Error promoting skill candidate: stage must be canary or stable."
if err := _ensure_admin(context):
return err
try:
client, _sandbox = await _get_neo_context(context)
sync_mgr = NeoSkillSyncManager()
result = await sync_mgr.promote_with_optional_sync(
client=client,
candidate_id=candidate_id,
stage=stage,
sync_to_local=sync_to_local,
)
return _to_json_text(result)
except Exception as e:
return f"Error promoting skill candidate: {str(e)}"这样可以在保持当前所有行为不变的前提下(包括回滚语义和错误信息,只要你在管理器中保留它们):
- 去掉工具内部重复的编排逻辑;
- 提供单一位置(
NeoSkillSyncManager)来统一工具和 HTTP 路由之间的行为。
Original comment in English
issue (complexity): Consider introducing a shared Neo tool base class and moving promote/sync/rollback orchestration into NeoSkillSyncManager to eliminate repeated admin/client/error/JSON boilerplate and centralize lifecycle logic.
You can reduce the complexity and duplication meaningfully without changing behavior by introducing a shared base and centralizing the promotion/sync/rollback orchestration.
1. Factor out a common Neo tool base
All tools repeat the same pattern:
_ensure_admin_get_neo_context- try/except → call SDK → JSON-ify or format error
You can keep behavior identical but move the plumbing into a base class so each tool only defines its parameters and the actual SDK call.
from dataclasses import dataclass, field
from typing import Any, Awaitable, Callable
@dataclass
class NeoSkillToolBase(FunctionTool):
error_prefix: str = "Error"
async def _run(
self,
context: ContextWrapper[AstrAgentContext],
neo_call: Callable[[Any, Any], Awaitable[Any]],
error_action: str,
) -> ToolExecResult:
if err := _ensure_admin(context):
return err
try:
client, sandbox = await _get_neo_context(context)
result = await neo_call(client, sandbox)
return _to_json_text(result)
except Exception as e:
return f"{self.error_prefix} {error_action}: {str(e)}"Then each tool becomes much smaller; for example, ListSkillReleasesTool:
@dataclass
class ListSkillReleasesTool(NeoSkillToolBase):
name: str = "astrbot_list_skill_releases"
description: str = "List skill releases."
parameters: dict = field(
default_factory=lambda: {
"type": "object",
"properties": {
"skill_key": {"type": "string"},
"active_only": {"type": "boolean", "default": False},
"stage": {"type": "string"},
"limit": {"type": "integer", "default": 100},
"offset": {"type": "integer", "default": 0},
},
"required": [],
}
)
async def call(
self,
context: ContextWrapper[AstrAgentContext],
skill_key: str | None = None,
active_only: bool = False,
stage: str | None = None,
limit: int = 100,
offset: int = 0,
) -> ToolExecResult:
return await self._run(
context,
lambda client, _sandbox: client.skills.list_releases(
skill_key=skill_key,
active_only=active_only,
stage=stage,
limit=limit,
offset=offset,
),
error_action="listing skill releases",
)You can gradually migrate each tool to this pattern; it keeps the same signatures and behavior but removes repeated admin/client/error/JSON boilerplate.
2. Centralize promote/sync/rollback orchestration
PromoteSkillCandidateTool contains orchestration logic that’s already conceptually part of NeoSkillSyncManager/lifecycle concerns. You can move the flow into a helper function/service and keep the tool thin.
For example, add a method to NeoSkillSyncManager (or a small service next to it):
class NeoSkillSyncManager:
# existing methods...
async def promote_with_optional_sync(
self,
client: Any,
candidate_id: str,
stage: str,
sync_to_local: bool,
) -> dict[str, Any]:
release = await client.skills.promote_candidate(candidate_id, stage=stage)
release_json = _to_jsonable(release)
sync_json: dict[str, Any] | None = None
rollback_json: dict[str, Any] | None = None
if stage == "stable" and sync_to_local:
try:
sync_result = await self.sync_release(
client,
release_id=str(release_json.get("id", "")),
require_stable=True,
)
sync_json = {
"skill_key": sync_result.skill_key,
"local_skill_name": sync_result.local_skill_name,
"release_id": sync_result.release_id,
"candidate_id": sync_result.candidate_id,
"payload_ref": sync_result.payload_ref,
"map_path": sync_result.map_path,
"synced_at": sync_result.synced_at,
}
except Exception as sync_err:
try:
rollback = await client.skills.rollback_release(
str(release_json.get("id", ""))
)
rollback_json = _to_jsonable(rollback)
except Exception as rollback_err:
raise RuntimeError(
"stable release sync failed and auto rollback also failed; "
f"sync_error={sync_err}; rollback_error={rollback_err}"
) from rollback_err
raise RuntimeError(
"stable release sync failed; auto rollback succeeded; "
f"sync_error={sync_err}; rollback={_to_json_text(rollback_json)}"
) from sync_err
return {
"release": release_json,
"sync": sync_json,
"rollback": rollback_json,
}Then PromoteSkillCandidateTool.call becomes:
@dataclass
class PromoteSkillCandidateTool(NeoSkillToolBase):
name: str = "astrbot_promote_skill_candidate"
description: str = "Promote one candidate to release stage (canary/stable)."
# parameters unchanged ...
async def call(
self,
context: ContextWrapper[AstrAgentContext],
candidate_id: str,
stage: str = "canary",
sync_to_local: bool = True,
) -> ToolExecResult:
if stage not in {"canary", "stable"}:
return "Error promoting skill candidate: stage must be canary or stable."
if err := _ensure_admin(context):
return err
try:
client, _sandbox = await _get_neo_context(context)
sync_mgr = NeoSkillSyncManager()
result = await sync_mgr.promote_with_optional_sync(
client=client,
candidate_id=candidate_id,
stage=stage,
sync_to_local=sync_to_local,
)
return _to_json_text(result)
except Exception as e:
return f"Error promoting skill candidate: {str(e)}"This keeps all current behavior (including rollback semantics and error messages, if you preserve them in the manager) but:
- Removes duplicated orchestration from the tool.
- Gives you a single place (
NeoSkillSyncManager) to align behavior between tools and HTTP routes.
|
补充说明(仅针对最近一个提交): 本次修补主要解决“本地上传 skill 与 ship/gull 预置 skill 的共存与同步一致性”问题:
本提交已本地验证:
|
Shipyard Neo 在 AstrBot 的接入已完成多轮提交,但在与 ship/gull 预置 Skills 共存时仍有同步策略问题:
skills/,可能影响预置 skills 的可见性与可用性;本 PR 基于既有提交继续增量完善,补齐“上传 -> 同步 -> 元数据回填 -> 统一展示”的闭环。
Modifications / 改动点
一、已有提交(本 PR 既有内容)
feat(computer): add shipyard_neo booter runtime and sandbox configfeat(skills): add neo lifecycle tools and stable sync managerfeat(dashboard): add neo skills APIs and management UIfix: address neo skill review findings二、本次增量修补(新增提交
40c7cf39)skills元数据并回填本地缓存。SkillManager.list_skills(runtime="sandbox")中合并“本地上传 + 沙箱预置”skills 视图。三、核心变更文件(本次增量)
astrbot/core/computer/computer_client.pyastrbot/core/skills/skill_manager.pyastrbot/dashboard/routes/skills.pytests/test_computer_skill_sync.pytests/test_skill_manager_sandbox_cache.pyThis is NOT a breaking change. / 这不是一个破坏性变更。
Screenshots or Test Results / 运行截图或测试结果
本次增量修补在本地完成以下验证:
uv run ruff format .uv run ruff check .uv run pytest -q tests/test_skill_manager_sandbox_cache.py tests/test_computer_skill_sync.py tests/test_neo_skill_sync.py6 passeduv run pytest -q tests/test_dashboard.py -k neo_skills_routes1 passedChecklist / 检查清单
requirements.txt和pyproject.toml文件相应位置。