From f115625c8af6f18f0837a3bd592830fd44ac4d27 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Apr 2026 11:57:11 +0000 Subject: [PATCH 1/3] Initial plan From dacaf9546dda955165f75c5e4fb00f53a8ee1da4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Apr 2026 12:04:35 +0000 Subject: [PATCH 2/3] Add list_resources() method and --list-resources CLI option Agent-Logs-Url: https://github.com/GitHubSecurityLab/seclab-taskflow-agent/sessions/705a4065-a077-4a16-80b0-112de4f0dc52 Co-authored-by: p- <176818+p-@users.noreply.github.com> --- src/seclab_taskflow_agent/available_tools.py | 91 +++++++++++++++++++- src/seclab_taskflow_agent/cli.py | 24 +++++- tests/test_yaml_parser.py | 69 ++++++++++++++- 3 files changed, 178 insertions(+), 6 deletions(-) diff --git a/src/seclab_taskflow_agent/available_tools.py b/src/seclab_taskflow_agent/available_tools.py index 577ae06..4b35cb8 100644 --- a/src/seclab_taskflow_agent/available_tools.py +++ b/src/seclab_taskflow_agent/available_tools.py @@ -12,6 +12,8 @@ __all__ = ["AvailableTools"] import importlib.resources +import os +import sys from enum import Enum from typing import Union @@ -48,6 +50,16 @@ class AvailableToolType(Enum): ModelConfig = "model_config" +# Maps each AvailableToolType to the conventional subdirectory name used in packages +_SUBDIR_MAP: dict[AvailableToolType, str] = { + AvailableToolType.Taskflow: "taskflows", + AvailableToolType.Personality: "personalities", + AvailableToolType.Toolbox: "toolboxes", + AvailableToolType.Prompt: "prompts", + AvailableToolType.ModelConfig: "model_configs", +} + + # Union of all document model types returned by AvailableTools DocumentModel = Union[ TaskflowDocument, PersonalityDocument, ToolboxDocument, @@ -86,6 +98,70 @@ def get_tool(self, tooltype: AvailableToolType, toolname: str) -> DocumentModel: """Generic loader — prefer the typed ``get_*()`` methods.""" return self._load(tooltype, toolname) + def list_resources( + self, tooltype: AvailableToolType | None = None + ) -> dict[AvailableToolType, list[str]]: + """Discover all available YAML resources across the Python path. + + Scans every directory in ``sys.path`` (including the current working + directory when the empty-string entry is present) for Python packages + that contain a conventional resource subdirectory (e.g. ``taskflows/``, + ``personalities/``, …). Each ``.yaml`` file found in such a + subdirectory is returned as a fully-qualified dotted resource name of + the form ``..``. + + Args: + tooltype: When provided, only resources of that type are returned. + When ``None`` (default) all types are returned. + + Returns: + A mapping from :class:`AvailableToolType` to a sorted list of + dotted resource names. + """ + types_to_scan = [tooltype] if tooltype is not None else list(AvailableToolType) + result: dict[AvailableToolType, list[str]] = {t: [] for t in types_to_scan} + + seen_dirs: set[str] = set() + for path_entry in sys.path: + actual_path = path_entry if path_entry else os.getcwd() + actual_path = os.path.abspath(actual_path) + if actual_path in seen_dirs or not os.path.isdir(actual_path): + continue + seen_dirs.add(actual_path) + + try: + top_level_entries = os.listdir(actual_path) + except PermissionError: + continue + + for pkg_name in top_level_entries: + pkg_path = os.path.join(actual_path, pkg_name) + if not os.path.isdir(pkg_path) or pkg_name.startswith("."): + continue + + for tt in types_to_scan: + subdir = _SUBDIR_MAP[tt] + subdir_path = os.path.join(pkg_path, subdir) + if not os.path.isdir(subdir_path): + continue + try: + yaml_files = sorted( + f[:-5] + for f in os.listdir(subdir_path) + if f.endswith(".yaml") + ) + except PermissionError: + continue + for stem in yaml_files: + resource_name = f"{pkg_name}.{subdir}.{stem}" + if resource_name not in result[tt]: + result[tt].append(resource_name) + + for tt in types_to_scan: + result[tt].sort() + + return result + def _load(self, tooltype: AvailableToolType, toolname: str) -> DocumentModel: """Load, validate, and cache a YAML grammar file. @@ -158,10 +234,23 @@ def _load(self, tooltype: AvailableToolType, toolname: str) -> DocumentModel: return doc except ModuleNotFoundError as exc: - raise BadToolNameError(f"Cannot load {toolname}: {exc}") from exc + raise BadToolNameError( + f"Cannot load {toolname}: {exc}" + + self._available_hint(tooltype) + ) from exc except FileNotFoundError: raise BadToolNameError( f"Cannot load {toolname} because {filepath} is not a valid file." + + self._available_hint(tooltype) ) except ValueError as exc: raise BadToolNameError(f"Cannot load {toolname}: {exc}") from exc + + def _available_hint(self, tooltype: AvailableToolType) -> str: + """Return a human-readable hint listing available resources of *tooltype*.""" + available = self.list_resources(tooltype).get(tooltype, []) + if not available: + return "" + items = "\n ".join(available) + label = _SUBDIR_MAP[tooltype] + return f"\n\nAvailable {label}:\n {items}" diff --git a/src/seclab_taskflow_agent/cli.py b/src/seclab_taskflow_agent/cli.py index 7569431..129d596 100644 --- a/src/seclab_taskflow_agent/cli.py +++ b/src/seclab_taskflow_agent/cli.py @@ -91,6 +91,10 @@ def main( bool, typer.Option("-l", "--list-models", help="List available tool-call models and exit."), ] = False, + list_resources: Annotated[ + bool, + typer.Option("-L", "--list-resources", help="List all available resources (taskflows, personalities, etc.) and exit."), + ] = False, globals_: Annotated[ list[str] | None, typer.Option("-g", "--global", help="Global variable as KEY=VALUE. Repeatable."), @@ -113,13 +117,13 @@ def main( debug = debug or os.getenv("TASK_AGENT_DEBUG", "").strip().lower() in ("1", "true", "yes") # Validate mutual exclusivity (resume is standalone) - if resume and (personality or taskflow or list_models): - typer.echo("Error: --resume cannot be combined with -p, -t, or -l.", err=True) + if resume and (personality or taskflow or list_models or list_resources): + typer.echo("Error: --resume cannot be combined with -p, -t, -l, or -L.", err=True) raise typer.Exit(code=1) - specified = sum(bool(x) for x in [personality, taskflow, list_models]) + specified = sum(bool(x) for x in [personality, taskflow, list_models, list_resources]) if specified > 1: - typer.echo("Error: -p, -t, and -l are mutually exclusive.", err=True) + typer.echo("Error: -p, -t, -l, and -L are mutually exclusive.", err=True) raise typer.Exit(code=1) _setup_logging() @@ -133,6 +137,18 @@ def main( typer.echo(model) raise typer.Exit() + # List resources mode + if list_resources: + from .available_tools import AvailableToolType, _SUBDIR_MAP + resources = available_tools.list_resources() + for tooltype in AvailableToolType: + names = resources.get(tooltype, []) + if names: + typer.echo(f"{_SUBDIR_MAP[tooltype]}:") + for name in names: + typer.echo(f" {name}") + raise typer.Exit() + if personality is None and taskflow is None and resume is None: typer.echo("Error: one of -p, -t, or --resume is required.", err=True) raise typer.Exit(code=1) diff --git a/tests/test_yaml_parser.py b/tests/test_yaml_parser.py index e3bef4d..828cff8 100644 --- a/tests/test_yaml_parser.py +++ b/tests/test_yaml_parser.py @@ -9,7 +9,7 @@ import pytest -from seclab_taskflow_agent.available_tools import AvailableTools +from seclab_taskflow_agent.available_tools import AvailableTools, AvailableToolType, BadToolNameError class TestYamlParser: @@ -63,5 +63,72 @@ def test_parse_example_taskflows(self): assert example.taskflow[0].task.max_steps == 20 +class TestListResources: + """Tests for AvailableTools.list_resources().""" + + def test_list_resources_returns_all_types(self): + """list_resources() returns a mapping with all AvailableToolType keys.""" + at = AvailableTools() + result = at.list_resources() + assert set(result.keys()) == set(AvailableToolType) + + def test_list_resources_taskflows_not_empty(self): + """list_resources() finds at least the example taskflows.""" + at = AvailableTools() + result = at.list_resources() + taskflows = result[AvailableToolType.Taskflow] + assert len(taskflows) > 0 + assert "examples.taskflows.example" in taskflows + + def test_list_resources_personalities_not_empty(self): + """list_resources() finds at least the built-in personalities.""" + at = AvailableTools() + result = at.list_resources() + personalities = result[AvailableToolType.Personality] + assert len(personalities) > 0 + assert "seclab_taskflow_agent.personalities.assistant" in personalities + + def test_list_resources_single_type(self): + """list_resources(tooltype=…) returns only that type.""" + at = AvailableTools() + result = at.list_resources(AvailableToolType.Taskflow) + assert set(result.keys()) == {AvailableToolType.Taskflow} + assert len(result[AvailableToolType.Taskflow]) > 0 + + def test_list_resources_sorted(self): + """list_resources() returns resource names in sorted order.""" + at = AvailableTools() + result = at.list_resources() + for tt, names in result.items(): + assert names == sorted(names), f"{tt} resources are not sorted" + + def test_list_resources_dotted_name_format(self): + """All returned resource names follow the package.subdir.stem format.""" + at = AvailableTools() + result = at.list_resources() + for names in result.values(): + for name in names: + parts = name.split(".") + assert len(parts) >= 3, f"Expected at least 3 parts in {name!r}" + + def test_not_found_error_includes_available_hint(self): + """BadToolNameError for missing taskflow includes available taskflow list.""" + at = AvailableTools() + with pytest.raises(BadToolNameError) as exc_info: + at.get_taskflow("examples.taskflows.this_does_not_exist") + message = str(exc_info.value) + assert "Available taskflows" in message + assert "examples.taskflows.example" in message + + def test_not_found_personality_includes_available_hint(self): + """BadToolNameError for missing personality includes available personality list.""" + at = AvailableTools() + with pytest.raises(BadToolNameError) as exc_info: + at.get_personality("seclab_taskflow_agent.personalities.no_such_personality") + message = str(exc_info.value) + assert "Available personalities" in message + assert "seclab_taskflow_agent.personalities.assistant" in message + + if __name__ == "__main__": pytest.main([__file__, "-v"]) From dc186329961d374e5778b2e629723b8477e9d2f7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 2 Apr 2026 12:06:56 +0000 Subject: [PATCH 3/3] Address code review feedback: use set for dedup, add comment, improve test assertion Agent-Logs-Url: https://github.com/GitHubSecurityLab/seclab-taskflow-agent/sessions/705a4065-a077-4a16-80b0-112de4f0dc52 Co-authored-by: p- <176818+p-@users.noreply.github.com> --- src/seclab_taskflow_agent/available_tools.py | 13 ++++++------- tests/test_yaml_parser.py | 1 + 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/src/seclab_taskflow_agent/available_tools.py b/src/seclab_taskflow_agent/available_tools.py index 4b35cb8..fb7db87 100644 --- a/src/seclab_taskflow_agent/available_tools.py +++ b/src/seclab_taskflow_agent/available_tools.py @@ -119,10 +119,13 @@ def list_resources( dotted resource names. """ types_to_scan = [tooltype] if tooltype is not None else list(AvailableToolType) - result: dict[AvailableToolType, list[str]] = {t: [] for t in types_to_scan} + # Use sets for intermediate storage to avoid O(n) duplicate checks + seen: dict[AvailableToolType, set[str]] = {t: set() for t in types_to_scan} seen_dirs: set[str] = set() for path_entry in sys.path: + # An empty string in sys.path represents the current working directory + # per Python convention (https://docs.python.org/3/library/sys.html#sys.path). actual_path = path_entry if path_entry else os.getcwd() actual_path = os.path.abspath(actual_path) if actual_path in seen_dirs or not os.path.isdir(actual_path): @@ -154,13 +157,9 @@ def list_resources( continue for stem in yaml_files: resource_name = f"{pkg_name}.{subdir}.{stem}" - if resource_name not in result[tt]: - result[tt].append(resource_name) + seen[tt].add(resource_name) - for tt in types_to_scan: - result[tt].sort() - - return result + return {tt: sorted(seen[tt]) for tt in types_to_scan} def _load(self, tooltype: AvailableToolType, toolname: str) -> DocumentModel: """Load, validate, and cache a YAML grammar file. diff --git a/tests/test_yaml_parser.py b/tests/test_yaml_parser.py index 828cff8..8dca663 100644 --- a/tests/test_yaml_parser.py +++ b/tests/test_yaml_parser.py @@ -110,6 +110,7 @@ def test_list_resources_dotted_name_format(self): for name in names: parts = name.split(".") assert len(parts) >= 3, f"Expected at least 3 parts in {name!r}" + assert all(p for p in parts), f"All parts must be non-empty in {name!r}" def test_not_found_error_includes_available_hint(self): """BadToolNameError for missing taskflow includes available taskflow list."""