From f5802f33629eff38b38606ee1cc435d04d734e69 Mon Sep 17 00:00:00 2001 From: zhangPinkdolphin <2308717915@qq.com> Date: Sat, 30 May 2026 15:15:08 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20Auto-Summary=20=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E6=97=A5=E5=BF=97=E8=87=AA=E5=8A=A8=E6=91=98=E8=A6=81=E5=B7=A5?= =?UTF-8?q?=E5=85=B7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 在 Agent 工作流中自动检测关键决策点(方案选择、确认执行、阶段完成), 将摘要写入 discussion_log.md。 - auto_summary.py: 核心模块,支持 online(钩子模式)和 offline(批量扫描) - plugins/auto_summary_plugin.py: 通过 hooks 系统注册 turn_after 事件 - tests/test_auto_summary.py: 48 个单元测试覆盖所有功能路径 --- auto_summary.py | 543 +++++++++++++++++++++++++++++ plugins/auto_summary_plugin.py | 67 ++++ tests/test_auto_summary.py | 600 +++++++++++++++++++++++++++++++++ 3 files changed, 1210 insertions(+) create mode 100644 auto_summary.py create mode 100644 plugins/auto_summary_plugin.py create mode 100644 tests/test_auto_summary.py diff --git a/auto_summary.py b/auto_summary.py new file mode 100644 index 000000000..e6dc785a7 --- /dev/null +++ b/auto_summary.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +""" +Auto-Summary: 会话日志自动摘要工具 + +在 Agent 工作流中自动检测关键决策点,将摘要写入 discussion_log.md。 + +两种模式: + 1. online(data_dict) — 作为钩子被 agent_loop 调用,传入当前轮次的上下文 + 2. offline(log_path) — 对已有的 model_responses_*.txt 批量扫描 + +触发条件(任一条命中即触发): + - 用户给出明确方案选择("选X"、"方案X"、"用X") + - 用户确认决策("执行"、"确认"、"同意"、"可以") + - 完成一个阶段("完成"、"结束"、"下一") + - 出现重要事实/结论/教训 + - 出现总结性内容("总结"、"综上"、"所以") + +零依赖(仅标准库)。 +""" + +import re +import json +import os +import glob +from datetime import datetime + +# ── 路径配置 ── +_script_dir = os.path.dirname(os.path.abspath(__file__)) +# 当 auto_summary.py 在代码根目录时,日志在 temp/model_responses/ +DEFAULT_LOG_DIR = _script_dir # 代码根 = GenericAgent3/ +DISCUSSION_LOG = os.path.join(DEFAULT_LOG_DIR, 'discussion_log.md') + + +# ── 触发关键词 ── + +_DECISION_PATTERNS = [ + r'(选|选择|采用|用|取)(\s*方案\s*)?[ABCD一二三四]', + r'方案\s*[ABCD一二三四]', + r'(选|选择)\s*方案\s*\d', + r'(就|就按)\s*(方案|这个|这个方案|你说的)', + r'走\s*(方案|路线|方向)\s*[ABCD一二三四]', +] + +_CONFIRM_PATTERNS = [ + r'^(好|行|可以|同意|确认|执行|开始|就这么办|没问题|OK|ok)', + r'^(确认|同意|批准)\s*(执行|开始)', + r'(可以|同意)\s*(执行|开始|实施)', + r'就这么\s*(定|办|决定)', +] + +_COMPLETION_PATTERNS = [ + r'(完成|结束|收工|搞定|完毕|通过|交付)', + r'下一(步|个|阶段|章节|部分)', + r'阶段\s*\d\s*(完成|结束)', + r'总结|综上|总而言之|总的来说', +] + +_FACT_PATTERNS = [ + r'(重要|关键|核心)\s*(发现|结论|事实|教训|启示)', + r'(记一下|记住|注意|别忘了|重要的)', + r'(教训|经验|学到)', + r'(原理|本质|原因)是', + r'这是因为|原因在于|根源是', +] + +# 应过滤掉的低价值状态更新模式 +_FILTER_PATTERNS = [ + r'Subagent.*?(工作|Turn|到).*?完成', + r'正在.*?执行.*?步骤', + r'子任务.*?完成', + r'Tool.*?returned', + r'观察.*?结果', # 状态观察 + r'再等一会儿|等待.*?完成|继续观察', + r'已读取完毕', + r'etc\.\.\.', # 当 summary 里全是 `...` + r'^\s*$', # 空行 +] + + +def _compile(*patterns): + return [re.compile(p) for p in patterns] + + +# ── 辅助函数 ── + +def _extract_user_text(prompt_block: str) -> str: + """从 Prompt JSON 块中提取用户的纯文本消息(过滤掉 tool_result)。""" + try: + data = json.loads(prompt_block) + if not isinstance(data, dict): + return '' + content = data.get('content', []) + texts = [] + for item in content: + if isinstance(item, dict) and item.get('type') == 'text': + text = item.get('text', '') + if text and not text.startswith('\n### [WORKING MEMORY]') and not text.startswith('\n[SYSTEM'): + texts.append(text) + return '\n'.join(texts) + except (json.JSONDecodeError, Exception): + return '' + + +def _extract_agent_text(response_block: str) -> str: + """从 Response Python repr 中提取 Agent 的 text 内容。""" + try: + data = json.loads(response_block) + except (json.JSONDecodeError, Exception): + try: + data = eval(response_block, {'__builtins__': {}}, {}) + except Exception: + return '' + if isinstance(data, dict): + data = [data] + if not isinstance(data, list): + return '' + texts = [] + for item in data: + if isinstance(item, dict) and item.get('type') == 'text': + t = item.get('text', '') + if t: + texts.append(t) + return '\n'.join(texts) + + +def _extract_agent_thinking(response_block: str) -> str: + """从 Response 中提取 thinking 内容(用于话题识别)。""" + try: + data = json.loads(response_block) + except Exception: + try: + data = eval(response_block, {'__builtins__': {}}, {}) + except Exception: + return '' + if isinstance(data, dict): + data = [data] + if not isinstance(data, list): + return '' + thoughts = [] + for item in data: + if isinstance(item, dict) and item.get('type') == 'thinking': + t = item.get('thinking', '') + if t: + thoughts.append(t) + return '\n'.join(thoughts) + + +def _extract_text_from_response(response) -> str: + """从 LLM response 对象中提取纯文本(用于 hook 模式)。 + + 兼容: + - Anthropic: response.content = [TextBlock(text='...'), ...] + - OpenAI: response.content = '...' + """ + if isinstance(response, str): + return response + if hasattr(response, 'content'): + content = response.content + if isinstance(content, str): + return content + if isinstance(content, list): + texts = [] + for block in content: + if isinstance(block, dict) and block.get('type') == 'text': + texts.append(block.get('text', '')) + elif hasattr(block, 'type') and block.type == 'text': + texts.append(getattr(block, 'text', '')) + return '\n'.join(texts) + # 兼容 dict 格式的 response(如 {'content': '...'}) + if isinstance(response, dict): + return response.get('content', str(response)) + return str(response) + + +def _is_low_value(text: str) -> bool: + """判断是否为低价值的自动状态更新。""" + if not text or len(text) < 10: + return True + for pat in _FILTER_PATTERNS: + if re.search(pat, text, re.IGNORECASE): + return True + return False + + +def _detect_triggers(text: str) -> list: + """检测文本命中哪些触发条件,返回标签列表。""" + tags = [] + if any(p.search(text) for p in _compile(*_DECISION_PATTERNS)): + tags.append('决策:方案选择') + if any(p.search(text) for p in _compile(*_CONFIRM_PATTERNS)): + tags.append('决策:确认') + if any(p.search(text) for p in _compile(*_COMPLETION_PATTERNS)): + tags.append('阶段完成') + if any(p.search(text) for p in _compile(*_FACT_PATTERNS)): + tags.append('重要事实') + return tags + + +def _extract_topic(user_text: str, agent_text: str, thinking: str = '') -> str: + """从对话中提取话题名称(首句或关键句)。""" + for text in [user_text, agent_text, thinking]: + if not text: + continue + lines = text.strip().split('\n') + for line in lines: + line = line.strip() + if not line: + continue + line = re.sub(r'|', '', line).strip() + if not line: + continue + if len(line) >= 8: + return line[:80] + return '(未能提取话题)' + + +def _extract_need(user_text: str) -> str: + """从用户消息中提取需求描述(前两句话)。""" + lines = [l.strip() for l in user_text.split('\n') if l.strip()] + need_lines = [] + for line in lines: + if line.startswith('{') or line.startswith('['): + continue + if line.startswith('') or line.startswith('###'): + continue + need_lines.append(line) + if len(need_lines) >= 2: + break + return ' '.join(need_lines)[:120] if need_lines else '(未能提取需求)' + + +def _extract_decision(user_text: str, agent_text: str) -> str: + """尝试从对话中提取明确的决策内容。""" + combined = f"{user_text}\n{agent_text}" + # 先清理 HTML/标签噪音 + cleaned = re.sub(r'|', '', combined) + + for pattern in _DECISION_PATTERNS + _CONFIRM_PATTERNS: + m = re.search(pattern, cleaned, re.MULTILINE) + if m: + start = max(0, m.start() - 40) + end = min(len(cleaned), m.end() + 40) + context = cleaned[start:end].replace('\n', ' ') + context = context.strip() + # 过滤掉含有文件路径的片段 + if re.search(r'[/\\][\w.-]+\.[\w]+', context): + continue + return context[:100] + + for sentence in re.split(r'[。!?\n]', cleaned): + # 要求句子不包含文件路径 + if re.search(r'[/\\][\w.-]+\.[\w]+', sentence): + continue + # 关键词检查(不含"用",因其太常见如"用户""使用") + if any(kw in sentence for kw in ['选', '决定', '确认', '同意']): + if len(sentence) > 5: + return sentence.strip()[:100] + + return '' + + +def _generate_tags(user_text: str, agent_text: str, trigger_tags: list) -> list: + """自动生成标签。""" + tags = set(trigger_tags) + combined = (user_text + ' ' + agent_text).lower() + keyword_tags = { + '写作': '#写作', '代码': '#代码', 'bug': '#bug', + '部署': '#部署', '调试': '#调试', '测试': '#测试', + '设计': '#设计', '方案': '#方案', 'pr': '#PR', + 'github': '#GitHub', '文档': '#文档', '配置': '#配置', + '浏览器': '#浏览器', '搜索': '#搜索', '蛋白质': '#蛋白质', + '模型': '#模型', '数据': '#数据', '复盘': '#复盘', + '计划': '#计划', '决策': '#决策', '讨论': '#讨论', + '交付': '#交付', '邮件': '#邮件', + } + for kw, tag in keyword_tags.items(): + if kw in combined: + tags.add(tag) + return sorted(tags) + + +def _format_entry(timestamp: str, topic: str, need: str, + discussion: str, decision: str, tags: list) -> str: + """格式化为 Markdown 条目。""" + parts = ['---', f'{timestamp}'] + if topic: + parts.append(f' 话题: {topic}') + if need: + parts.append(f' 用户需求: {need}') + if discussion: + if len(discussion) > 200: + discussion = discussion[:200] + '...' + parts.append(f' 讨论内容: {discussion}') + if decision: + parts.append(f' 决策: {decision}') + if tags: + parts.append(f' 标签: {" ".join(tags)}') + parts.append('') + return '\n'.join(parts) + + +def _last_entry_hash(log_path: str) -> str: + """读取最后一个条目的粗略 hash(去重用)。""" + if not os.path.isfile(log_path): + return '' + try: + with open(log_path, 'r', encoding='utf-8') as f: + content = f.read() + entries = content.strip().split('\n---\n') + if entries and entries[-1].strip(): + # 取最后条目的前 80 个字符作为 hash + return entries[-1].strip()[:80] + return '' + except Exception: + return '' + + +# ═══════════════════════════════════════════════ +# 管道模式(Online):单轮次摘要 +# ═══════════════════════════════════════════════ + +def online(user_message: str = '', agent_response: str = '', + turn: int = 0, timestamp: str = '', + log_path: str = None, + response_obj=None) -> dict: + """ + 在线模式:传入当前轮次的用户消息和 Agent 回复,检测是否需写摘要。 + + 参数: + user_message: 当前轮次的用户消息(纯文本) + agent_response: 当前轮次的 Agent 回复(纯文本) + turn: 轮次数 + timestamp: 时间戳字符串(如 '2026-05-30 11:45') + log_path: discussion_log.md 路径 + response_obj: 原始 LLM response 对象(用于自动提取 agent_response) + + 返回: + {'written': bool, 'tags': list, 'entry': str} + """ + if log_path is None: + log_path = DISCUSSION_LOG + + if not timestamp: + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') + + # 如果传入了 response_obj,自动提取 agent_response + if response_obj and not agent_response: + agent_response = _extract_text_from_response(response_obj) or '' + + # 如果 user_message 和 agent_response 都为空,跳过 + if not user_message and not agent_response: + return {'written': False, 'tags': [], 'entry': ''} + + # 检测是否为低价值状态更新 + combined = f"{user_message}\n{agent_response}" + if _is_low_value(combined): + return {'written': False, 'tags': [], 'entry': ''} + + # 检测触发 + trigger_tags = _detect_triggers(combined) + + if not trigger_tags: + return {'written': False, 'tags': [], 'entry': ''} + + # 提取信息 + topic = _extract_topic(user_message, agent_response) + need = _extract_need(user_message) if user_message else '' + discussion = agent_response[:200] if agent_response else '' + decision = _extract_decision(user_message, agent_response) + tags = _generate_tags(user_message, agent_response, trigger_tags) + + # 格式化为 Markdown 条目 + entry = _format_entry(timestamp, topic, need, discussion, decision, tags) + + # 去重检查:与最后一条比较 + last_hash = _last_entry_hash(log_path) + if last_hash and entry.strip()[:80] == last_hash: + return {'written': False, 'tags': tags, 'entry': entry.strip(), 'dedup': True} + + # 写入日志 + log_path = os.path.abspath(log_path) + os.makedirs(os.path.dirname(log_path), exist_ok=True) + with open(log_path, 'a', encoding='utf-8') as f: + f.write(entry + '\n') + + return {'written': True, 'tags': tags, 'entry': entry.strip()} + + +# ═══════════════════════════════════════════════ +# 离线模式:扫描已有日志文件 +# ═══════════════════════════════════════════════ + +def _parse_log_file(filepath: str) -> list: + """ + 解析 model_responses_*.txt,返回轮次列表。 + 每轮: {'timestamp': str, 'user_msg': str, 'agent_msg': str, 'thinking': str} + """ + turns = [] + with open(filepath, 'r', encoding='utf-8', errors='replace') as f: + content = f.read() + + blocks = re.split(r'^=== (Prompt|Response) === (\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2})\n', + content, flags=re.MULTILINE) + + current_prompt_ts = None + current_prompt_body = None + + i = 0 + while i < len(blocks): + if blocks[i] in ('Prompt', 'Response'): + label = blocks[i] + ts = blocks[i+1] + body = blocks[i+2] if i+2 < len(blocks) else '' + i += 3 + + if label == 'Prompt': + current_prompt_ts = ts + current_prompt_body = body + elif label == 'Response': + if current_prompt_ts: + user_text = _extract_user_text(current_prompt_body or '') + agent_text = _extract_agent_text(body) + thinking = _extract_agent_thinking(body) + turns.append({ + 'timestamp': current_prompt_ts, + 'user_msg': user_text, + 'agent_msg': agent_text, + 'thinking': thinking, + }) + current_prompt_ts = None + current_prompt_body = None + else: + i += 1 + + return turns + + +def offline(log_path: str = None, output_path: str = None, + all_files: bool = True, max_files: int = 0) -> dict: + """ + 离线模式:扫描已有日志文件,批量生成摘要。 + + 参数: + log_path: 单个日志文件路径(如果指定,则只扫这个文件) + output_path: discussion_log.md 路径(默认为 temp/discussion_log.md) + all_files: 是否扫描 model_responses/ 下所有文件 + max_files: 扫描文件数上限(0=不限,仅当 all_files=True 时生效) + + 返回: + {'entries_written': int, 'files_scanned': int, 'turns_analyzed': int} + """ + if output_path is None: + output_path = DISCUSSION_LOG + + if log_path: + files = [log_path] + else: + responses_dir = os.path.join(DEFAULT_LOG_DIR, 'temp', 'model_responses') + if not os.path.isdir(responses_dir): + # 回退:从 temp/ 外的代码根找 + responses_dir = os.path.join(DEFAULT_LOG_DIR, 'model_responses') + files = sorted(glob.glob(os.path.join(responses_dir, 'model_responses_*.txt'))) + if max_files > 0 and len(files) > max_files: + files = files[-max_files:] + + total_entries = 0 + total_turns = 0 + + for fpath in files: + if not os.path.isfile(fpath): + continue + try: + turns = _parse_log_file(fpath) + except Exception as e: + print(f" ⚠ 解析失败: {fpath} — {e}") + continue + + total_turns += len(turns) + + for turn_data in turns: + result = online( + user_message=turn_data['user_msg'], + agent_response=turn_data['agent_msg'], + timestamp=turn_data['timestamp'][:16], + log_path=output_path, + ) + if result.get('written'): + total_entries += 1 + + return { + 'entries_written': total_entries, + 'files_scanned': len(files), + 'turns_analyzed': total_turns, + } + + +# ═══════════════════════════════════════════════ +# 命令行入口 +# ═══════════════════════════════════════════════ + +def main(): + import argparse + + parser = argparse.ArgumentParser( + description='Auto-Summary: 会话日志自动摘要工具', + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +示例: + # 扫描所有日志生成摘要 + python auto_summary.py + + # 扫描单个日志文件 + python auto_summary.py -f model_responses_967045.txt + + # 指定输出路径 + python auto_summary.py -o ~/discussion_log.md + """ + ) + parser.add_argument('-f', '--file', help='指定单个日志文件路径') + parser.add_argument('-o', '--output', help='discussion_log.md 输出路径') + parser.add_argument('-n', '--max-files', type=int, default=0, + help='扫描文件数上限(0=全部)') + + args = parser.parse_args() + + print("Auto-Summary 开始扫描...") + print(f" 输出目标: {args.output or DISCUSSION_LOG}") + + result = offline( + log_path=args.file, + output_path=args.output, + all_files=(args.max_files == 0), + max_files=args.max_files, + ) + + print(f" 扫描文件: {result['files_scanned']}") + print(f" 分析轮次: {result['turns_analyzed']}") + print(f" 写入条目: {result['entries_written']}") + print("完成。") + + +if __name__ == '__main__': + main() diff --git a/plugins/auto_summary_plugin.py b/plugins/auto_summary_plugin.py new file mode 100644 index 000000000..acdaade10 --- /dev/null +++ b/plugins/auto_summary_plugin.py @@ -0,0 +1,67 @@ +""" +Auto-Summary Plugin: 通过 hook 系统自动记录关键决策/阶段完成到 discussion_log.md。 + +安装: + - 确保 auto_summary.py 在代码根目录 + - 无需修改 agent_loop.py,插件通过 import 自动注册 hook + - 可通过移除环境变量 AUTO_SUMMARY_DISABLE=1 禁用 + +工作原理: + 钩在 'turn_after' 事件上,提取用户消息和 Agent 回复, + 调用 auto_summary.online() 检测触发条件并写入 discussion_log.md。 +""" + +import os +import sys + +# 将代码根加入 path(这样能 import auto_summary) +_SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +_CODE_ROOT = os.path.abspath(os.path.join(_SCRIPT_DIR, '..')) +if _CODE_ROOT not in sys.path: + sys.path.insert(0, _CODE_ROOT) + +# 如果设置了禁用环境变量,跳过 +if os.environ.get('AUTO_SUMMARY_DISABLE'): + # 静默跳过 + pass +else: + import plugins.hooks as hooks + import auto_summary + + @hooks.register('turn_after') + def _auto_summary_on_turn_end(ctx): + """在每次 Agent 轮次结束时检查是否需记录摘要。""" + try: + # 提取用户消息 + user_msg = '' + # 优先用原始的 user_input + if ctx.get('user_input'): + user_msg = ctx['user_input'] + # 如果有 next_prompts,用最新的 + next_prompts = ctx.get('next_prompts') or [] + if next_prompts and next_prompts[-1]: + user_msg = next_prompts[-1] + + # 提取 Agent 回复 + response = ctx.get('response') + + # 获取轮次和时间 + turn = ctx.get('turn', 0) + from datetime import datetime + timestamp = datetime.now().strftime('%Y-%m-%d %H:%M') + + # 调用 auto_summary.online() + result = auto_summary.online( + user_message=user_msg, + response_obj=response, + turn=turn, + timestamp=timestamp, + ) + + if result.get('written'): + tags = ' '.join(result.get('tags', [])) + print(f"[Auto-Summary] ✓ 记录摘要 ({tags})") + + except Exception as e: + # 插件不中断主流程 + print(f"[Auto-Summary] ⚠ 插件异常: {e}") diff --git a/tests/test_auto_summary.py b/tests/test_auto_summary.py new file mode 100644 index 000000000..3835829bf --- /dev/null +++ b/tests/test_auto_summary.py @@ -0,0 +1,600 @@ +#!/usr/bin/env python3 +""" +Auto-Summary 单元测试。 +""" + +import os +import sys +import json +import tempfile +import unittest + +# 把代码根加入 path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..')) + +from auto_summary import ( + _extract_user_text, + _extract_agent_text, + _extract_text_from_response, + _is_low_value, + _detect_triggers, + _extract_topic, + _extract_need, + _extract_decision, + _generate_tags, + _format_entry, + _last_entry_hash, + online, + offline, + _parse_log_file, +) + + +class TestExtractUserText(unittest.TestCase): + """测试从 Prompt JSON 块提取用户文本。""" + + def test_extract_simple(self): + """简单文本提取。""" + block = json.dumps({ + 'content': [ + {'type': 'text', 'text': '帮我写一个 Python 脚本'}, + ] + }) + result = _extract_user_text(block) + self.assertEqual(result, '帮我写一个 Python 脚本') + + def test_extract_multiple_texts(self): + """多个文本块。""" + block = json.dumps({ + 'content': [ + {'type': 'text', 'text': '第一步'}, + {'type': 'text', 'text': '第二步'}, + ] + }) + result = _extract_user_text(block) + self.assertIn('第一步', result) + self.assertIn('第二步', result) + + def test_ignore_tool_result(self): + """忽略 tool_result 或其他类型。""" + block = json.dumps({ + 'content': [ + {'type': 'tool_result', 'content': '{}'}, + {'type': 'text', 'text': '用户消息'}, + ] + }) + result = _extract_user_text(block) + self.assertEqual(result, '用户消息') + + def test_ignore_system_blocks(self): + """忽略带 WORKING MEMORY 和 SYSTEM 的文本块。""" + block = json.dumps({ + 'content': [ + {'type': 'text', 'text': '\n### [WORKING MEMORY]\n...'}, + {'type': 'text', 'text': '实际用户消息'}, + ] + }) + result = _extract_user_text(block) + self.assertEqual(result, '实际用户消息') + + def test_empty_block(self): + """空块返回空字符串。""" + self.assertEqual(_extract_user_text('{}'), '') + self.assertEqual(_extract_user_text(''), '') + self.assertEqual(_extract_user_text('not json'), '') + + +class TestExtractAgentText(unittest.TestCase): + """测试从 Response 块提取 Agent 文本。""" + + def test_extract_simple(self): + """简单列表格式。""" + block = json.dumps([ + {'type': 'text', 'text': '这是回复内容。'} + ]) + result = _extract_agent_text(block) + self.assertEqual(result, '这是回复内容。') + + def test_extract_with_thinking(self): + """混合 thinking 和 text。""" + block = json.dumps([ + {'type': 'thinking', 'thinking': '内部思考...'}, + {'type': 'text', 'text': '最终回复。'}, + ]) + result = _extract_agent_text(block) + self.assertEqual(result, '最终回复。') + + def test_extract_multiple_texts(self): + """多个 text 块。""" + block = json.dumps([ + {'type': 'text', 'text': '第一段。'}, + {'type': 'text', 'text': '第二段。'}, + ]) + result = _extract_agent_text(block) + self.assertEqual(result, '第一段。\n第二段。') + + def test_extract_no_text(self): + """没有 text 块。""" + block = json.dumps([ + {'type': 'thinking', 'thinking': '思考中'} + ]) + self.assertEqual(_extract_agent_text(block), '') + + +class TestExtractTextFromResponse(unittest.TestCase): + """测试从 LLM response 对象提取文本。""" + + def test_string_response(self): + """字符串直接返回。""" + self.assertEqual(_extract_text_from_response('hello'), 'hello') + + def test_dict_response(self): + """带 content 的 dict 格式。""" + response = type('Response', (), {'content': 'text content'})() + self.assertEqual(_extract_text_from_response(response), 'text content') + + def test_list_response(self): + """带 content list 的格式(类 Anthropic)。""" + class TextBlock: + def __init__(self, text): + self.type = 'text' + self.text = text + response = type('Response', (), {'content': [ + TextBlock('hello'), + TextBlock('world'), + ]})() + self.assertEqual(_extract_text_from_response(response), 'hello\nworld') + + def test_dict_list_response(self): + """content list 中是 dict 格式。""" + response = type('Response', (), {'content': [ + {'type': 'text', 'text': 'hello'}, + {'type': 'text', 'text': 'world'}, + ]})() + self.assertEqual(_extract_text_from_response(response), 'hello\nworld') + + +class TestIsLowValue(unittest.TestCase): + """测试低价值状态更新过滤。""" + + def test_short_message(self): + """短消息被认为是低价值。""" + self.assertTrue(_is_low_value('short')) + self.assertTrue(_is_low_value('')) + + def test_subagent_status(self): + """Subagent 状态更新被过滤。""" + texts = [ + 'Subagent 正在工作(Turn 1 已完成环境探测)', + 'Subagent 已到 Turn 4,继续观察完成状态', + 'Subagent 在工作完成中', + ] + for t in texts: + self.assertTrue(_is_low_value(t), f"未过滤: {t}") + + def test_wait_patterns(self): + """等待/观察模式被过滤。""" + texts = [ + '接近完成,再等一会儿收结果', + '等待子任务完成', + '继续观察完成状态', + ] + for t in texts: + self.assertTrue(_is_low_value(t), f"未过滤: {t}") + + def test_read_complete(self): + """已读取完毕被过滤。""" + self.assertTrue(_is_low_value('已读取完毕。以下是内容...')) + + def test_meaningful_text_not_filtered(self): + """有意义的文本不应被过滤。""" + texts = [ + '确认执行方案B,开始实现代码吧', # 15 chars, >= 10 threshold + '我决定采用Plan Mode来规划任务', + '重要发现:模型在10轮后开始过拟合', + '总结一下:这个方案有三个优点', + ] + for t in texts: + self.assertFalse(_is_low_value(t), f"被误过滤: {t}") + + +class TestDetectTriggers(unittest.TestCase): + """测试触发条件检测。""" + + def test_detect_decision(self): + """检测方案选择。""" + texts = [ + '我选方案A', + '采用方案B来处理', + '用方案一二三', + '就按你说的方案来', + '走方案C路线', + ] + for t in texts: + tags = _detect_triggers(t) + self.assertIn('决策:方案选择', tags, f"未检测到决策: {t}") + + def test_detect_confirm(self): + """检测确认。""" + texts = [ + '可以,开始执行', + '确认执行计划', + '同意,开始实施', + '就这么办', + '好,开始吧', + ] + for t in texts: + tags = _detect_triggers(t) + self.assertIn('决策:确认', tags, f"未检测到确认: {t}") + + def test_detect_completion(self): + """检测阶段完成。""" + texts = [ + '任务完成', + '第一阶段结束', + '收工', + '搞定', + '总结一下方案', + ] + for t in texts: + tags = _detect_triggers(t) + self.assertIn('阶段完成', tags, f"未检测到完成: {t}") + + def test_detect_fact(self): + """检测重要事实。""" + texts = [ + '重要发现:性能提升了50%', + '关键结论是这个方案可行', + '教训是不要过早优化', + '记一下这个参数很重要', + ] + for t in texts: + tags = _detect_triggers(t) + self.assertIn('重要事实', tags, f"未检测到事实: {t}") + + def test_no_trigger(self): + """普通文本不应触发。""" + self.assertEqual(_detect_triggers('今天天气不错'), []) + self.assertEqual(_detect_triggers('让我先查一下文档'), []) + self.assertEqual(_detect_triggers('今天天气挺不错的'), []) + + +class TestExtractTopic(unittest.TestCase): + """测试话题提取。""" + + def test_from_user_text(self): + """从用户文本提取话题。""" + topic = _extract_topic( + '帮我写一个 Python 脚本处理数据', + '好的,我来写这个脚本。' + ) + self.assertIn('Python', topic) + + def test_from_agent_text(self): + """从 Agent 文本提取话题。""" + topic = _extract_topic( + '', + '已经完成数据分析。' + ) + self.assertEqual(topic, '已经完成数据分析。') + + def test_fallback(self): + """无有效话题时返回占位。""" + topic = _extract_topic('', '', '') + self.assertEqual(topic, '(未能提取话题)') + + +class TestExtractNeed(unittest.TestCase): + """测试用户需求提取。""" + + def test_simple_need(self): + """提取前两句。""" + need = _extract_need('帮我写个脚本。需要处理 CSV 文件。') + self.assertIn('帮我写个脚本', need) + + def test_ignore_blocks(self): + """忽略 JSON 块和摘要标签。""" + need = _extract_need('{}\n[1,2,3]\n实际需求\n补充说明') + self.assertIn('实际需求', need) + self.assertIn('补充说明', need) + self.assertNotIn('{', need) + + +class TestExtractDecision(unittest.TestCase): + """测试决策内容提取。""" + + def test_extract_decision_pattern(self): + """从方案选择模式提取。""" + decision = _extract_decision( + '我选方案B。', + '好的,开始执行方案B。' + ) + self.assertTrue(len(decision) > 0) + + def test_extract_confirm(self): + """从确认模式提取。""" + decision = _extract_decision( + '确认执行。', + '收到,开始执行计划。' + ) + self.assertIn('确认', decision) + + def test_no_filepath_noise(self): + """不提取含文件路径的决策。""" + decision = _extract_decision( + '读一下 projects/session_index_pr/HANDOFF.md', + '用户要求读取一个文件。' + ) + # "用户"中的"用"不应触发决策提取 + self.assertEqual(decision, '') + + def test_no_false_positive(self): + """普通对话不应提取出虚假决策。""" + decision = _extract_decision( + '继续推进项目', + '了解,我来看一下具体方案。' + ) + self.assertEqual(decision, '') + + +class TestGenerateTags(unittest.TestCase): + """测试标签生成。""" + + def test_keyword_tags(self): + """关键词标签自动追加。""" + tags = _generate_tags( + '帮我写一个 Python 脚本处理数据', + '好的,使用pandas处理', + ['阶段完成'] + ) + self.assertIn('#数据', tags) + + def test_pr_tag(self): + """PR 关键词触发 #PR 标签。""" + tags = _generate_tags( + '提交一个 PR', + '', + [] + ) + self.assertIn('#PR', tags) + + +class TestFormatEntry(unittest.TestCase): + """测试 Markdown 格式化。""" + + def test_format_simple(self): + """基本格式检查。""" + entry = _format_entry( + '2026-05-30 12:00', + '话题', + '需求', + '讨论内容', + '决策内容', + ['阶段完成', '#代码'] + ) + self.assertIn('2026-05-30 12:00', entry) + self.assertIn('话题: 话题', entry) + self.assertIn('用户需求: 需求', entry) + self.assertIn('讨论内容: 讨论内容', entry) + self.assertIn('决策: 决策内容', entry) + self.assertIn('标签: 阶段完成 #代码', entry) + + def test_truncated_discussion(self): + """讨论内容超长截断。""" + long_text = 'A' * 250 + entry = _format_entry( + '2026-05-30 12:00', '话题', '', long_text, '', [] + ) + self.assertIn('...', entry) + self.assertLess(len(entry), 400) + + +class TestLastEntryHash(unittest.TestCase): + """测试去重哈希。""" + + def test_empty_file(self): + """空文件返回空。""" + with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: + f.write('') + path = f.name + try: + self.assertEqual(_last_entry_hash(path), '') + finally: + os.unlink(path) + + def test_last_entry(self): + """返回最后一条内容的前 80 字符。""" + content = '---\n2026-05-30 12:00\n 话题: test\n 标签: done\n\n' + with tempfile.NamedTemporaryFile(mode='w', suffix='.md', delete=False) as f: + f.write(content) + path = f.name + try: + h = _last_entry_hash(path) + self.assertTrue(h.startswith('---'), msg=f"Hash starts with: {h[:20]!r}") + finally: + os.unlink(path) + + +class TestOnline(unittest.TestCase): + """测试在线模式。""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.log_path = os.path.join(self.tmpdir, 'discussion_log.md') + + def tearDown(self): + if os.path.isfile(self.log_path): + os.unlink(self.log_path) + os.rmdir(self.tmpdir) + + def test_decision_triggers_write(self): + """方案选择触发写入。""" + result = online( + user_message='我选方案B,开始实现吧', + agent_response='好的,开始执行方案B。', + log_path=self.log_path, + ) + self.assertTrue(result['written']) + self.assertIn('决策:方案选择', result['tags']) + + def test_confirm_triggers_write(self): + """确认触发写入。""" + result = online( + user_message='确认执行', + agent_response='收到,开始执行。', + log_path=self.log_path, + ) + self.assertTrue(result['written']) + self.assertIn('决策:确认', result['tags']) + + def test_completion_triggers_write(self): + """阶段完成触发写入。""" + result = online( + user_message='第一阶段完成', + agent_response='总结一下成果。', + log_path=self.log_path, + ) + self.assertTrue(result['written']) + self.assertIn('阶段完成', result['tags']) + + def test_low_value_skipped(self): + """低价值状态更新跳过。""" + result = online( + user_message='', + agent_response='Subagent 正在工作(Turn 1 已完成环境探测)', + log_path=self.log_path, + ) + self.assertFalse(result['written']) + + def test_no_trigger_skipped(self): + """无触发条件时跳过。""" + result = online( + user_message='今天天气不错', + agent_response='是的,适合户外活动。', + log_path=self.log_path, + ) + self.assertFalse(result['written']) + + def test_dedup(self): + """连续相同条目去重。""" + result1 = online( + user_message='确认执行方案', + agent_response='收到。', + log_path=self.log_path, + ) + if result1['written']: + result2 = online( + user_message='确认执行方案', + agent_response='收到。', + log_path=self.log_path, + ) + self.assertFalse(result2.get('written', False), + msg="应检测到重复条目") + + +class TestParseLogFile(unittest.TestCase): + """测试日志文件解析。""" + + def _make_log(self, lines: list) -> str: + """Helper: 创建临时日志文件。""" + fd, path = tempfile.mkstemp(suffix='.txt', prefix='model_responses_') + with os.fdopen(fd, 'w') as f: + f.write('\n'.join(lines)) + return path + + def test_simple_turn(self): + """解析一个完整的轮次。""" + prompt = json.dumps({ + 'content': [{'type': 'text', 'text': '用户消息'}] + }) + response = json.dumps([ + {'type': 'text', 'text': 'Agent回复'} + ]) + log_lines = [ + f'=== Prompt === 2026-05-30 12:00:00', + prompt, + '', + f'=== Response === 2026-05-30 12:00:05', + response, + ] + path = self._make_log(log_lines) + try: + turns = _parse_log_file(path) + self.assertEqual(len(turns), 1) + self.assertEqual(turns[0]['user_msg'], '用户消息') + self.assertEqual(turns[0]['agent_msg'], 'Agent回复') + finally: + os.unlink(path) + + def test_multiple_turns(self): + """解析多个轮次。""" + prompt1 = json.dumps({'content': [{'type': 'text', 'text': '第一轮'}]}) + resp1 = json.dumps([{'type': 'text', 'text': '第一轮回复'}]) + prompt2 = json.dumps({'content': [{'type': 'text', 'text': '第二轮'}]}) + resp2 = json.dumps([{'type': 'text', 'text': '第二轮回复'}]) + log_lines = [ + f'=== Prompt === 2026-05-30 12:00:00', + prompt1, '', + f'=== Response === 2026-05-30 12:00:05', + resp1, '', + f'=== Prompt === 2026-05-30 12:01:00', + prompt2, '', + f'=== Response === 2026-05-30 12:01:05', + resp2, + ] + path = self._make_log(log_lines) + try: + turns = _parse_log_file(path) + self.assertEqual(len(turns), 2) + finally: + os.unlink(path) + + +class TestOfflineIntegration(unittest.TestCase): + """集成测试:离线模式处理真实日志。""" + + def setUp(self): + self.tmpdir = tempfile.mkdtemp() + self.output = os.path.join(self.tmpdir, 'discussion_log.md') + + def tearDown(self): + if os.path.isfile(self.output): + os.unlink(self.output) + os.rmdir(self.tmpdir) + + def test_process_real_log(self): + """处理真实日志文件。""" + # 使用项目目录下的真实日志 + log_dir = os.path.join( + os.path.dirname(__file__), '..', 'temp', 'model_responses' + ) + logs = sorted([ + os.path.join(log_dir, f) + for f in os.listdir(log_dir) + if f.startswith('model_responses_') and f.endswith('.txt') + ]) + if not logs: + self.skipTest("无真实日志文件可用") + + result = offline( + log_path=logs[0], + output_path=self.output, + ) + self.assertGreaterEqual(result['turns_analyzed'], 0) + self.assertGreaterEqual(result['files_scanned'], 1) + # 可能是0,如果日志中没有触发条件 + self.assertGreaterEqual(result['entries_written'], 0) + + def test_no_logs(self): + """日志目录不存在时优雅降级。""" + result = offline( + log_path='/tmp/nonexistent_log.txt', + output_path=self.output, + ) + self.assertEqual(result['files_scanned'], 1) + self.assertEqual(result['turns_analyzed'], 0) + self.assertEqual(result['entries_written'], 0) + + +if __name__ == '__main__': + unittest.main()