diff --git a/graphcore/tools/memory.py b/graphcore/tools/memory.py index 51b1dc2..ad388d5 100644 --- a/graphcore/tools/memory.py +++ b/graphcore/tools/memory.py @@ -106,8 +106,8 @@ def do_rename_pure(self, old_path: str, new_path: str) -> Generator[P, Row, str] ... def _validate(self, path: str) -> str: - r = pathlib.Path(path).resolve() - if not r.is_relative_to("/memories"): + r = pathlib.PurePosixPath(path) + if not r.is_absolute() or not r.is_relative_to("/memories"): raise MemoryBackendError(f"{str(r)} is not rooted in /memories") return str(r) @@ -1087,13 +1087,6 @@ class FileSystemMemoryBackend(FileSystemBackendCommon, MemoryBackend[Never, Neve def __init__(self, storage_folder: pathlib.Path, init_from: pathlib.Path | None = None): super().__init__(storage_folder, init_from) - def _run_all[T](self, d: Generator[Never, Never, T]) -> T: - try: - next(d) - assert False - except StopIteration as e: - return e.value - @override def _run_multi[T](self, d: Generator[Never, Never, T]) -> T: return self._run_all(d) @@ -1121,6 +1114,18 @@ async def _run_row[T](self, d: Generator[Never, Never, T]) -> T: @override async def _run_update[T](self, d: Generator[Never, Never, T]) -> T: return self._run_all(d) + +class InvalidPathError(RuntimeError): + def __init__(self, msg: str): + super().__init__(msg) + self.msg = msg + +def _validate_path(s: str): + p = pathlib.PurePosixPath(s) + if not p.is_absolute(): + raise InvalidPathError(f"Memory path: {s} is not an absolute path") + if not p.is_relative_to("/memories"): + raise InvalidPathError(f"Memory path: {s} is not rooted at /memories") def _memory_tool_impl[R]( backend: MemoryToolImpl[R], @@ -1133,6 +1138,7 @@ def _memory_tool_impl[R]( return missing_required("path") elif args.file_text is None: return missing_required("file_text") + _validate_path(args.path) return backend.create(args.path, args.file_text) case "delete": @@ -1147,12 +1153,15 @@ def _memory_tool_impl[R]( return missing_required("insert_line") elif args.insert_text is None: return missing_required("insert_text") + _validate_path(args.path) return backend.insert(args.path, args.insert_line, args.insert_text) case "rename": if args.old_path is None: return missing_required("old_path") elif args.new_path is None: return missing_required("new_path") + _validate_path(args.old_path) + _validate_path(args.new_path) return backend.rename(args.old_path, args.new_path) case "str_replace": @@ -1162,11 +1171,13 @@ def _memory_tool_impl[R]( return missing_required("old_str") elif args.new_str is None: return missing_required("new_str") + _validate_path(args.path) return backend.str_replace(args.path, args.old_str, args.new_str) case "view": if args.path is None: return missing_required("path") + _validate_path(args.path) range : tuple[int, int] | None = None if args.view_range is not None and len(args.view_range) >= 2: range = (args.view_range[0], args.view_range[1]) @@ -1182,9 +1193,12 @@ class MemoryTool(WithAsyncImplementation[str], UnifiedMemorySchema): """ @override async def run(self) -> str: - return await _memory_tool_impl( - backend, self, missing_required - ) + try: + return await _memory_tool_impl( + backend, self, missing_required + ) + except InvalidPathError as e: + return f"Error: {e.msg}" return MemoryTool.as_tool("memory") def memory_tool(backend: MemoryToolImpl[str]) -> BaseTool: @@ -1200,7 +1214,10 @@ class MemoryTool(WithImplementation[str], UnifiedMemorySchema): """ @override def run(self) -> str: - return _memory_tool_impl( - backend, self, missing_required - ) + try: + return _memory_tool_impl( + backend, self, missing_required + ) + except InvalidPathError as e: + return f"Error: {e.msg}" return MemoryTool.as_tool("memory") diff --git a/graphcore/tools/schemas.py b/graphcore/tools/schemas.py index 9232ade..0845beb 100644 --- a/graphcore/tools/schemas.py +++ b/graphcore/tools/schemas.py @@ -12,7 +12,9 @@ ST = TypeVar("ST") -T_RES = TypeVar("T_RES", bound=str | Command) +type BareResult = str | dict + +T_RES = TypeVar("T_RES", bound=BareResult | list[BareResult] | Command) class WithInjectedState(BaseModel, Generic[ST]): state: Annotated[ST, InjectedState] diff --git a/graphcore/tools/vfs.py b/graphcore/tools/vfs.py index 296cb2e..0a5a3f2 100644 --- a/graphcore/tools/vfs.py +++ b/graphcore/tools/vfs.py @@ -14,8 +14,11 @@ # along with this program. If not, see . from typing_extensions import TypedDict -from typing import NotRequired, TypeVar, Any, Annotated, Type, Callable, Sequence, ContextManager, Protocol, Iterator, Generic +from typing import NotRequired, TypeVar, Any, Annotated, Type, Callable, Iterable, Sequence, ContextManager, Protocol, Iterator, Generic +import asyncio import re +import shutil +from dataclasses import dataclass, field from functools import cache import pathlib import contextlib @@ -48,6 +51,52 @@ def _make_checker(patt: str | None) -> Callable[[str], bool]: match = re.compile(patt) return lambda f_name: match.fullmatch(f_name) is None + +# Always-on exclude floor. ``.git`` is never useful for any consumer +# (VFS tools, materialization, audit) — including it is pure waste at +# best and actively misleading at worst (every workflow would re-snapshot +# megabytes of history). User-supplied ``global_exclude`` patterns and +# predicates union on top of this floor. + + +def _floor_include(path: str) -> bool: + """``.git`` directory and contents are always excluded. True if the + path passes the floor (i.e. is NOT under .git anywhere in its path + parts; covers root .git, nested .git from submodules, etc.).""" + return ".git" not in pathlib.PurePosixPath(path).parts + + +# Type alias for the user-facing ``global_exclude`` config. Both forms +# return True to *exclude* the path. The callable form (preferred) is +# more natural when the predicate cares about Path semantics like +# ``suffix`` or ``parts``; the regex form is retained for symmetry with +# ``forbidden_read``/``forbidden_write`` but is clunky for path logic +# (you end up writing ``(.*/)?node_modules(/.*)?`` for what's just +# ``"node_modules" in p.parts``). +type GlobalExcludeArg = str | Callable[[pathlib.PurePath], bool] | None + + +def _make_global_include_pred(arg: GlobalExcludeArg) -> Callable[[str], bool]: + """Returns an *include* predicate (True = okay to access) that + composes the always-on ``.git`` floor with the user-supplied + exclusion (if any). + + Polarity matches ``_make_checker``: True means the path is + accessible. The user-facing argument is named ``global_exclude`` + because the user thinks in terms of "what to exclude"; the + polarity flip happens here at the boundary so internal predicate + composition stays consistent. + """ + if arg is None: + return _floor_include + if isinstance(arg, str): + rx = re.compile(arg) + # fullmatch for symmetry with forbidden_read/forbidden_write. + return lambda p: _floor_include(p) and rx.fullmatch(p) is None + # Callable form: caller's predicate returns True to exclude. We flip. + user_excludes = arg + return lambda p: _floor_include(p) and not user_excludes(pathlib.PurePosixPath(p)) + class FileRange(BaseModel): start_line: int = Field(description="The line to start reading from; lines are numbered starting from 1.") end_line: int = Field(description="The line to read until EXCLUSIVE.") @@ -139,6 +188,11 @@ class _ListFileSchemaBase(BaseModel): class _PutFileSchemaBase(BaseModel): """ Put file contents onto the virtual file system used by this workflow. + + For incremental changes to existing files, prefer ``edit_file`` — it lets + you replace a substring without re-emitting the whole file. Reach for + ``put_file`` when creating new files, doing a wholesale rewrite, or + creating several files in one call. """ files: dict[str, str] = \ Field(description="A dictionary associating RELATIVE pathnames to the contents to store at those path names. Do NOT include a leading `./` it is always implied. " @@ -146,6 +200,30 @@ class _PutFileSchemaBase(BaseModel): "Any files contents with the same path named stored in previous tool calls are overwritten.") +class _EditFileSchemaBase(BaseModel): + """ + Replace an exact substring in an existing file on the VFS. Use this for + incremental changes — adding a few lines, fixing a bug, renaming an + identifier — instead of rewriting the file via ``put_file``. + + ``old_string`` must match the file contents exactly, including whitespace + and newlines. If unsure, read the relevant lines via ``get_file`` first. + + By default ``old_string`` must appear exactly once; the call errors on zero + or multiple matches so you can disambiguate by widening ``old_string`` with + surrounding context. Set ``replace_all=True`` to replace every occurrence. + + Use ``put_file`` for files that do not yet exist or for wholesale rewrites. + """ + path: str = Field(description="The relative path of the file on the VFS. Do NOT include a leading `./` (it is implied).") + old_string: str = Field(description="The exact substring to replace. Must match the file's contents byte-for-byte, including whitespace and newlines.") + new_string: str = Field(description="The replacement substring.") + replace_all: bool = Field( + description="If true, replace every occurrence of ``old_string``. If false (default), the call errors when ``old_string`` does not appear exactly once.", + default=False, + ) + + class _GrepFileSchemaBase(BaseModel): """ Search for a specific string in the files on the VFS. The output depends on the @@ -211,12 +289,15 @@ def debugging_tmp_directory() -> Iterator[str]: def _materialize( provider: TempDirectoryProvider, fs_layer: str | None, - state: VFSState + state: VFSState, + include_path: Callable[[str], bool], ) -> Iterator[str]: with provider() as dir: files = state["vfs"] target = pathlib.Path(dir) for (k, v) in files.items(): + if not include_path(k): + continue tgt = target / k tgt.parent.mkdir(exist_ok=True, parents=True) tgt.write_text(v) @@ -227,6 +308,8 @@ def _materialize( if not p.is_file(): continue rel_path = p.relative_to(mounted_path) + if not include_path(str(rel_path)): + continue copy_path = target / rel_path if copy_path.exists(): continue @@ -241,6 +324,13 @@ class VFSToolConfig(TypedDict): forbidden_read: NotRequired[str] forbidden_write: NotRequired[str] + # Paths invisible to every consumer (tools, materialization, audit). + # Either a fullmatch regex (BC; clunky for path logic) or a + # ``Callable[[PurePath], bool]`` returning True to exclude + # (preferred). ``.git`` is always excluded; this composes on top. + # See ``fs_tools_layered`` for the contract. + global_exclude: NotRequired[GlobalExcludeArg] + put_doc_extra: NotRequired[str] get_doc_extra: NotRequired[str] @@ -274,16 +364,31 @@ def wrapper(*args: PS.args, **kwargs: PS.kwargs) -> TOOL_RET: class _VFSAccess(Generic[InputType]): - def __init__(self, conf: VFSToolConfig): + def __init__( + self, + conf: VFSToolConfig, + global_include: Callable[[str], bool], + ): self.conf = conf + # Single global-include predicate (True = okay to access), + # consistent in polarity with ``_make_checker``. Composed by + # the parent factory and shared with the tool closures so we + # don't rebuild the regex per call AND there's no chance of + # one site forgetting to consult it. + self._global_include = global_include def materialize(self, state: InputType, debug: bool = False) -> ContextManager[str]: manager : TempDirectoryProvider = tempfile.TemporaryDirectory if debug: manager = debugging_tmp_directory - return _materialize(manager, self.conf.get("fs_layer"), state) + return _materialize( + manager, self.conf.get("fs_layer"), state, + include_path=self._global_include, + ) def get(self, state: InputType, file: str) -> bytes | None: + if not self._global_include(file): + return None if file in state["vfs"]: return state["vfs"][file].encode("utf-8") fs_layer = self.conf.get("fs_layer", None) @@ -298,6 +403,8 @@ def get(self, state: InputType, file: str) -> bytes | None: def iterate(self, state: InputType) -> Iterator[tuple[str, bytes]]: d = state["vfs"] for (p, v) in d.items(): + if not self._global_include(p): + continue yield (p, v.encode("utf-8")) if (fs_layer := self.conf.get("fs_layer", None)) is not None: @@ -306,9 +413,12 @@ def iterate(self, state: InputType) -> Iterator[tuple[str, bytes]]: if not child.is_file(): continue rel_path = child.relative_to(root) - if str(rel_path) in d: + rel_str = str(rel_path) + if not self._global_include(rel_str): + continue + if rel_str in d: continue - yield (str(rel_path), child.read_bytes()) + yield (rel_str, child.read_bytes()) def vfs_tools(conf: VFSToolConfig, ty: Type[InputType]) -> tuple[list[BaseTool], VFSAccessor[InputType]]: @@ -334,8 +444,20 @@ class PutFileSchema(_PutFileSchemaBase): __doc__ = pf_doc tool_call_id: Annotated[str, InjectedToolCallId] - put_filter = _make_checker(conf.get("forbidden_write")) - get_filter = _make_checker(conf.get("forbidden_read")) + # Single global-include predicate (True = okay to access) compiled + # once and folded into the read/write filters below. Sharing avoids + # rebuilding the regex per call AND removes the "remember to also + # check the global exclude" foot-gun — every site only consults + # ``can_read`` / ``can_write``. + global_include = _make_global_include_pred(conf.get("global_exclude")) + forbidden_write_chk = _make_checker(conf.get("forbidden_write")) + forbidden_read_chk = _make_checker(conf.get("forbidden_read")) + can_write: Callable[[str], bool] = ( + lambda p: forbidden_write_chk(p) and global_include(p) + ) + can_read: Callable[[str], bool] = ( + lambda p: forbidden_read_chk(p) and global_include(p) + ) @tool(args_schema=PutFileSchema) @handle_path_errors @@ -346,7 +468,7 @@ def put_file( to_update = {} for (k, upd) in files.items(): norm_path = _normalize_and_validate(k) - if not put_filter(norm_path): + if not can_write(norm_path): return f"Illegal put operation: cannot write {k} on the VFS" to_update[norm_path] = upd return tool_output( @@ -355,8 +477,64 @@ def put_file( "vfs": to_update } ) - + + @inject() + class EditFileSchema(_EditFileSchemaBase): + __doc__ = _EditFileSchemaBase.__doc__ + tool_call_id: Annotated[str, InjectedToolCallId] + + @tool(args_schema=EditFileSchema) + @handle_path_errors + def edit_file( + path: str, + old_string: str, + new_string: str, + state: Annotated[InputType, InjectedState], + tool_call_id: Annotated[str, InjectedToolCallId], + replace_all: bool = False, + ) -> str | Command: + norm_path = _normalize_and_validate(path) + if not can_write(norm_path): + return f"Illegal edit operation: cannot write {path} on the VFS" + cont = _get_content(state, norm_path) + if cont is None: + return ( + f"File not found at {path}. edit_file only modifies existing files; " + f"use put_file to create new ones." + ) + occurrences = cont.count(old_string) + if occurrences == 0: + return ( + f"old_string does not appear in {path}. Read the file via get_file " + f"to verify exact contents (whitespace and newlines matter) before " + f"retrying." + ) + if occurrences > 1 and not replace_all: + return ( + f"old_string appears {occurrences} times in {path}. Either widen " + f"old_string with surrounding context to make it unique, or pass " + f"replace_all=true to replace every occurrence." + ) + if replace_all: + new_content = cont.replace(old_string, new_string) + else: + new_content = cont.replace(old_string, new_string, 1) + return tool_output( + tool_call_id=tool_call_id, + res={ + "vfs": {norm_path: new_content}, + }, + ) + def _get_content_raw(s: InputType, path: str) -> str | None: + # Note: ``can_read`` is enforced upstream in ``_get_content`` and + # ``_list_files``; this helper is the raw underlay reader. + # ``global_include`` is still checked here because callers + # (e.g. ``grep_files`` via ``_get_content_raw``) bypass + # ``can_read``'s ``forbidden_read`` filter but must NEVER see + # globally-excluded paths. + if not global_include(path): + return None vfs = s["vfs"] if path not in vfs: layer = conf.get("fs_layer", None) @@ -371,9 +549,9 @@ def _get_content_raw(s: InputType, path: str) -> str | None: return None else: return vfs[path] - + def _get_content(s: InputType, path: str) -> str | None: - if not get_filter(path): + if not can_read(path): return None return _get_content_raw(s, path) @@ -410,11 +588,11 @@ def _list_files( state: InputType ) -> Iterator[str]: for (k, _) in state["vfs"].items(): - if not get_filter(k): + if not can_read(k): continue yield k for f_name in list_underlying(): - if not get_filter(f_name) or f_name in state["vfs"]: + if not can_read(f_name) or f_name in state["vfs"]: continue yield f_name @@ -449,39 +627,241 @@ def file_contents() -> Iterator[tuple[str, str]]: tools: list[BaseTool] = [get_file, list_files, grep_files] if not conf["immutable"]: tools.append(put_file) + tools.append(edit_file) - materializer = _VFSAccess[InputType](conf=conf) + materializer = _VFSAccess[InputType](conf=conf, global_include=global_include) return (tools, materializer) -def fs_tools(fs_layer: str, forbidden_read: str | None = None, *, cache_listing: bool = True) -> list[BaseTool]: +class FSBackend(Protocol): + """A read-only filesystem-like source. + + ``get`` returns the text contents at ``path`` or ``None`` if the path is + not present in this backend. ``list`` enumerates every path this backend + can serve. ``dump_to`` writes every file the backend serves into + ``target`` so downstream tools (e.g. solc) can read them from disk. + + ``dump_to`` accepts an optional ``include_path`` predicate; when set, + only paths for which ``include_path(path)`` returns ``True`` are + written to ``target``. ``None`` (the default) writes every file. + Backends should honor the predicate efficiently when possible (e.g. + ``DirBackend`` passes it through to ``shutil.copytree``'s ``ignore`` + argument so excluded directory subtrees are short-circuited at the + root rather than recursed into and filtered file-by-file). + + The predicate is the implementation primitive used by callers like + ``_LayeredMaterializer`` to enforce ``global_exclude`` patterns + uniformly across every backend in a stack — but the filter itself + is general and not tied to that one use case. """ - Create stateless file system tools that operate directly on a directory. - Unlike vfs_tools, these tools don't use langgraph state - they simply - read from the provided filesystem path. Useful for immutable file access - where no VFS overlay is needed. + def get(self, path: str) -> str | None: + ... - Args: - fs_layer: Path to the directory to expose - forbidden_read: Optional regex pattern for paths that cannot be read - cache_listing: If True (default), cache the directory listing after first call. - Set to False if the agent needs to react to filesystem changes. + def list(self) -> Iterable[str]: + ... - Returns: - List of tools: [get_file, list_files, grep_files] + async def dump_to( + self, + target: pathlib.Path, + include_path: Callable[[str], bool] | None = None, + ) -> None: + ... + + +class Materializer(Protocol): + """Writes a composite filesystem view into a real on-disk directory. + + Typically the caller creates a tmpdir and passes it as ``target``. The + materializer is responsible for honoring layering order: higher-priority + backends overwrite lower-priority ones so the disk layout matches what + ``fs_tools_layered`` reads would see. + + ``get`` is a single-file query against the same composite view used by + ``dump_to``; returns the file's content (first-hit across layers in + priority order) or ``None`` if no layer serves the path. This is the + unfiltered view — ``forbidden_read`` only affects the tool surface, not + materialization or existence queries. """ - base_path = pathlib.Path(fs_layer) - check_allowed = _make_checker(forbidden_read) - def _list_all_files() -> Sequence[str]: - return [str(f.relative_to(base_path)) for f in base_path.rglob("*") if f.is_file()] + async def dump_to(self, target: pathlib.Path) -> None: + ... + + def get(self, path: str) -> str | None: + ... + - if cache_listing: - _list_all_files = cache(_list_all_files) +@dataclass +class DirBackend: + """``FSBackend`` adapter over a real on-disk directory.""" - list_all_files = _list_all_files + root: pathlib.Path + cache_listing: bool = True + _cached: list[str] | None = field(default=None, init=False, repr=False) + + def __post_init__(self) -> None: + self.root = pathlib.Path(self.root) + + def _enumerate(self) -> list[str]: + return [ + str(f.relative_to(self.root)) + for f in self.root.rglob("*") + if f.is_file() + ] + + def get(self, path: str) -> str | None: + child = self.root / path + if not child.is_file(): + return None + try: + return child.read_text() + except Exception: + return None + + def list(self) -> Iterable[str]: + if not self.cache_listing: + return self._enumerate() + if self._cached is None: + self._cached = self._enumerate() + return self._cached + + async def dump_to( + self, + target: pathlib.Path, + include_path: Callable[[str], bool] | None = None, + ) -> None: + root = self.root + if include_path is None: + ignore = None + else: + # Adapt the predicate to ``shutil.copytree``'s ``ignore`` + # callback. The callback is invoked once per directory; we + # relativize ``src`` back to ``self.root`` so the predicate + # sees the same project-relative paths it does in ``list`` + # and ``get``. Returning a name in the ignore list short- + # circuits the entire subtree at that point. + def _ignore(src: str, names: list[str]) -> list[str]: + src_rel = pathlib.Path(src).relative_to(root) + prefix = "" if str(src_rel) == "." else f"{src_rel}/" + return [n for n in names if not include_path(f"{prefix}{n}")] + ignore = _ignore + + def _copy() -> None: + target.mkdir(parents=True, exist_ok=True) + shutil.copytree(root, target, dirs_exist_ok=True, ignore=ignore) + await asyncio.to_thread(_copy) + + +class _LayeredMaterializer: + """Materializer that dumps backends in reverse priority order. + + Lowest-priority (last in the list) dumps first, so higher-priority + (earlier in the list) backends' writes overwrite on collision. This + preserves the first-hit read semantics of ``fs_tools_layered`` at + materialization time. + + ``global_exclude`` (regex) defines paths that are invisible to every + consumer — VFS tools, materialization, and (transitively) any + downstream the materializer feeds (audit DB, prover, typecheck). + Distinct from ``forbidden_read``, which only filters the agent's + tool surface and lets materialization through. ``.git`` is hardcoded + into the exclude floor; the user pattern unions on top. + """ + + def __init__( + self, + backends: Sequence[FSBackend], + global_exclude: GlobalExcludeArg = None, + ) -> None: + self._backends = list(backends) + # Single include predicate (True = okay to access). Polarity + # matches ``_make_checker`` and ``FSBackend.dump_to``'s + # ``include_path`` parameter, so backends and tool closures + # consume the same shape. + self._include: Callable[[str], bool] = _make_global_include_pred(global_exclude) + + async def dump_to(self, target: pathlib.Path) -> None: + for backend in reversed(self._backends): + await backend.dump_to(target, include_path=self._include) + + def get(self, path: str) -> str | None: + if not self._include(path): + return None + for backend in self._backends: + content = backend.get(path) + if content is not None: + return content + return None + + +def fs_tools_layered( + backends: Sequence[FSBackend], + forbidden_read: str | None = None, + global_exclude: GlobalExcludeArg = None, +) -> tuple[list[BaseTool], Materializer]: + """Create stateless read-only filesystem tools over a layered backend stack. + + ``backends`` are consulted in priority order (first wins) for ``get``, + and unioned (with de-duplication by path) for ``list``. + + ``forbidden_read`` (fullmatch regex) filters the agent-facing tool + surface only — ``get_file``/``list_files``/``grep_files``. The + materializer still dumps every non-globally-excluded file so + downstream consumers (e.g. solc) see a complete tree. + + ``global_exclude`` filters every consumer — the tool surface, the + materializer, and (transitively) anyone the materializer feeds + (audit DB, prover). Accepts either a fullmatch regex (BC; clunky + for path logic) or a ``Callable[[PurePath], bool]`` returning True + to exclude (preferred). The ``.git`` directory is always excluded; + the user pattern/predicate composes on top. + + Returns ``(tools, materializer)``. ``tools`` is the usual + ``[get_file, list_files, grep_files]``; ``materializer`` dumps the + composite view into a caller-provided directory, honoring layer + priority and the global exclude. + """ + # Single fold: tool-surface readability is "forbidden_read allows + # AND not globally excluded". One predicate = no foot-gun about + # remembering to also check the global exclude at every call site. + forbidden_chk = _make_checker(forbidden_read) + global_include = _make_global_include_pred(global_exclude) + can_read: Callable[[str], bool] = lambda p: forbidden_chk(p) and global_include(p) + backend_list = list(backends) + + def _lookup_unfiltered(path: str) -> str | None: + # ``forbidden_read`` is tool-only; the materializer's existence + # check and similar paths use this unfiltered hook. We still + # short-circuit on ``global_include`` because globally excluded + # paths must be invisible to *every* consumer including this one. + if not global_include(path): + return None + for backend in backend_list: + content = backend.get(path) + if content is not None: + return content + return None + + def _lookup(path: str) -> str | None: + if not can_read(path): + return None + for backend in backend_list: + content = backend.get(path) + if content is not None: + return content + return None + + def _enumerate() -> Iterator[str]: + seen: set[str] = set() + for backend in backend_list: + for path in backend.list(): + if path in seen: + continue + seen.add(path) + if not can_read(path): + continue + yield path @_copy_base_doc class GetFileSchema(_GetFileSchemaBase): @@ -491,15 +871,7 @@ class GetFileSchema(_GetFileSchemaBase): @handle_path_errors def get_file(path: str, range: FileRange | None = None) -> str: norm_path = _normalize_and_validate(path) - if not check_allowed(norm_path): - return "File not found" - child = base_path / norm_path - if child.is_file(): - try: - return _get_file(child.read_text(), range) - except Exception: - return "File not found" - return "File not found" + return _get_file(_lookup(norm_path), range) @_copy_base_doc class ListFileSchema(_ListFileSchemaBase): @@ -507,7 +879,7 @@ class ListFileSchema(_ListFileSchemaBase): @tool(args_schema=ListFileSchema) def list_files() -> str: - return "\n".join(f for f in list_all_files() if check_allowed(f)) + return "\n".join(_enumerate()) @_copy_base_doc class GrepFileSchema(_GrepFileSchemaBase): @@ -517,24 +889,57 @@ class GrepFileSchema(_GrepFileSchemaBase): def grep_files( search_string: str, matching_lines: bool, - match_in: list[str] | None = None + match_in: list[str] | None = None, ) -> str: def file_contents() -> Iterator[tuple[str, str]]: - for f in base_path.rglob("*"): - if not f.is_file(): + for path in _enumerate(): + content = _lookup_unfiltered(path) + if content is None: continue - rel_name = str(f.relative_to(base_path)) - if not check_allowed(rel_name): - continue - try: - yield (rel_name, f.read_text()) - except Exception: - continue - def read_file(p: str) -> str | None: - try: - return (base_path / p).read_text() - except: - return None - return _grep_impl(search_string, matching_lines, file_contents(), read_file, match_in) + yield (path, content) + return _grep_impl( + search_string, + matching_lines, + file_contents(), + _lookup_unfiltered, + match_in, + ) + + tools: list[BaseTool] = [get_file, list_files, grep_files] + return tools, _LayeredMaterializer(backend_list, global_exclude=global_exclude) - return [get_file, list_files, grep_files] + +def fs_tools( + fs_layer: str, + forbidden_read: str | None = None, + *, + cache_listing: bool = True, + global_exclude: GlobalExcludeArg = None, +) -> list[BaseTool]: + """ + Create stateless file system tools that operate directly on a directory. + + Unlike vfs_tools, these tools don't use langgraph state - they simply + read from the provided filesystem path. Useful for immutable file access + where no VFS overlay is needed. + + Args: + fs_layer: Path to the directory to expose + forbidden_read: Optional regex pattern for paths that cannot be read + cache_listing: If True (default), cache the directory listing after first call. + Set to False if the agent needs to react to filesystem changes. + global_exclude: Optional fullmatch regex for paths invisible to *every* + consumer (tools, materialization, audit). ``.git`` is always + excluded; this pattern unions on top. See + ``fs_tools_layered`` for the full contract. + + Returns: + List of tools: [get_file, list_files, grep_files] + """ + backend = DirBackend(pathlib.Path(fs_layer), cache_listing=cache_listing) + tools, _ = fs_tools_layered( + [backend], + forbidden_read=forbidden_read, + global_exclude=global_exclude, + ) + return tools diff --git a/tests/test_fs_layered.py b/tests/test_fs_layered.py new file mode 100644 index 0000000..26a05fb --- /dev/null +++ b/tests/test_fs_layered.py @@ -0,0 +1,617 @@ +"""Tests for the layered FS surface in ``graphcore.tools.vfs``. + +Covers: + +* ``DirBackend`` — reads / list / caching / ``dump_to`` behaviour. +* ``_LayeredMaterializer`` — priority-ordered dump so higher-priority layers + overwrite lower ones on collision. +* ``fs_tools_layered`` — first-hit read semantics, union-deduplicated listing, + ``forbidden_read`` filtering, and grep behaviour across layers. +""" + +import pathlib +from dataclasses import dataclass, field +from typing import Callable, Iterable + +import pytest + +from graphcore.tools.vfs import ( + DirBackend, + _LayeredMaterializer, + _make_global_include_pred, + fs_tools_layered, +) + +# Async / sync test classes opt in to ``pytest.mark.asyncio`` individually +# rather than via a module-level mark — synchronous predicate tests in +# ``TestGlobalExcludePredicate`` don't want the asyncio handling and were +# producing "marked asyncio but not async" warnings under the module-wide +# mark. + + +# --------------------------------------------------------------------------- +# Test helpers +# --------------------------------------------------------------------------- + + +@dataclass +class InMemoryBackend: + """Minimal in-memory ``FSBackend`` implementation for layering tests. + + Declared test-local rather than imported, since the runtime counterpart + in ``composer/spec/natspec/pipeline.py`` has extra responsibilities we + don't want to drag into the graphcore test suite. + """ + + files: dict[str, str] = field(default_factory=dict) + + def get(self, path: str) -> str | None: + return self.files.get(path) + + def list(self) -> Iterable[str]: + return list(self.files) + + async def dump_to( + self, + target: pathlib.Path, + include_path: Callable[[str], bool] | None = None, + ) -> None: + for rel, content in self.files.items(): + if include_path is not None and not include_path(rel): + continue + dst = target / rel + dst.parent.mkdir(parents=True, exist_ok=True) + dst.write_text(content) + + +def _tools_by_name(backends, **kw): + tools, mat = fs_tools_layered(backends, **kw) + return {t.name: t for t in tools}, mat + + +def _write_tree(root: pathlib.Path, files: dict[str, str]) -> None: + for rel, content in files.items(): + p = root / rel + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(content) + + +# --------------------------------------------------------------------------- +# DirBackend +# --------------------------------------------------------------------------- + + +class TestDirBackend: + pytestmark = pytest.mark.asyncio + + async def test_get_returns_content(self, tmp_path: pathlib.Path): + _write_tree(tmp_path, {"a/b.sol": "hello"}) + b = DirBackend(tmp_path) + assert b.get("a/b.sol") == "hello" + + async def test_get_missing_returns_none(self, tmp_path: pathlib.Path): + b = DirBackend(tmp_path) + assert b.get("missing.sol") is None + + async def test_get_on_directory_returns_none(self, tmp_path: pathlib.Path): + (tmp_path / "subdir").mkdir() + b = DirBackend(tmp_path) + assert b.get("subdir") is None + + async def test_list_enumerates_nested_files(self, tmp_path: pathlib.Path): + _write_tree(tmp_path, { + "a/b.sol": "x", + "c.sol": "y", + "a/d/e.sol": "z", + }) + b = DirBackend(tmp_path) + assert set(b.list()) == {"a/b.sol", "c.sol", "a/d/e.sol"} + + async def test_list_caches_by_default(self, tmp_path: pathlib.Path): + _write_tree(tmp_path, {"a.sol": "v1"}) + b = DirBackend(tmp_path) + list(b.list()) # prime the cache + (tmp_path / "b.sol").write_text("added after cache") + # Cache hit: b.sol should NOT appear. + assert set(b.list()) == {"a.sol"} + + async def test_list_live_when_cache_disabled(self, tmp_path: pathlib.Path): + _write_tree(tmp_path, {"a.sol": "v1"}) + b = DirBackend(tmp_path, cache_listing=False) + list(b.list()) + (tmp_path / "b.sol").write_text("added") + assert set(b.list()) == {"a.sol", "b.sol"} + + async def test_dump_to_copies_tree(self, tmp_path: pathlib.Path): + src = tmp_path / "src" + dst = tmp_path / "dst" + _write_tree(src, {"a/b.sol": "x", "c.sol": "y"}) + b = DirBackend(src) + await b.dump_to(dst) + assert (dst / "a/b.sol").read_text() == "x" + assert (dst / "c.sol").read_text() == "y" + + async def test_dump_to_merges_into_existing_dir(self, tmp_path: pathlib.Path): + src = tmp_path / "src" + dst = tmp_path / "dst" + dst.mkdir() + (dst / "existing.sol").write_text("keep") + _write_tree(src, {"new.sol": "new"}) + b = DirBackend(src) + await b.dump_to(dst) + assert (dst / "existing.sol").read_text() == "keep" + assert (dst / "new.sol").read_text() == "new" + + +# --------------------------------------------------------------------------- +# _LayeredMaterializer +# --------------------------------------------------------------------------- + + +class TestLayeredMaterializer: + pytestmark = pytest.mark.asyncio + + async def test_empty_backends_is_noop(self, tmp_path: pathlib.Path): + mat = _LayeredMaterializer([]) + await mat.dump_to(tmp_path) + assert list(tmp_path.iterdir()) == [] + + async def test_single_backend_dumps(self, tmp_path: pathlib.Path): + b = InMemoryBackend({"x.sol": "content"}) + mat = _LayeredMaterializer([b]) + await mat.dump_to(tmp_path) + assert (tmp_path / "x.sol").read_text() == "content" + + async def test_collision_highest_priority_wins(self, tmp_path: pathlib.Path): + """First element in the backend list is the highest-priority layer. + + The materializer dumps in reverse order, so the highest-priority + backend writes last and survives on disk. This preserves the + first-hit read semantics of ``fs_tools_layered``. + """ + high = InMemoryBackend({"shared.sol": "from_high"}) + low = InMemoryBackend({"shared.sol": "from_low"}) + mat = _LayeredMaterializer([high, low]) + await mat.dump_to(tmp_path) + assert (tmp_path / "shared.sol").read_text() == "from_high" + + async def test_three_layer_priority(self, tmp_path: pathlib.Path): + high = InMemoryBackend({"shared.sol": "H"}) + mid = InMemoryBackend({"shared.sol": "M"}) + low = InMemoryBackend({"shared.sol": "L"}) + mat = _LayeredMaterializer([high, mid, low]) + await mat.dump_to(tmp_path) + assert (tmp_path / "shared.sol").read_text() == "H" + + async def test_unique_paths_union(self, tmp_path: pathlib.Path): + b1 = InMemoryBackend({"a.sol": "a"}) + b2 = InMemoryBackend({"b.sol": "b"}) + mat = _LayeredMaterializer([b1, b2]) + await mat.dump_to(tmp_path) + assert (tmp_path / "a.sol").read_text() == "a" + assert (tmp_path / "b.sol").read_text() == "b" + + +# --------------------------------------------------------------------------- +# fs_tools_layered — read operations across layers +# --------------------------------------------------------------------------- + + +class TestLayeredReads: + pytestmark = pytest.mark.asyncio + + async def test_get_first_hit_wins(self): + high = InMemoryBackend({"shared.sol": "from_high"}) + low = InMemoryBackend({"shared.sol": "from_low"}) + tools, _ = _tools_by_name([high, low]) + assert tools["get_file"].invoke({"path": "shared.sol"}) == "from_high" + + async def test_get_falls_through_to_lower_backend(self): + high = InMemoryBackend({"a.sol": "a"}) + low = InMemoryBackend({"b.sol": "b"}) + tools, _ = _tools_by_name([high, low]) + assert tools["get_file"].invoke({"path": "b.sol"}) == "b" + + async def test_get_missing_returns_not_found(self): + tools, _ = _tools_by_name([InMemoryBackend({})]) + resp = tools["get_file"].invoke({"path": "nope.sol"}) + assert "not found" in resp.lower() + + async def test_list_unions_and_dedupes(self): + b1 = InMemoryBackend({"a.sol": "a", "shared.sol": "v1"}) + b2 = InMemoryBackend({"b.sol": "b", "shared.sol": "v2"}) + tools, _ = _tools_by_name([b1, b2]) + listing = set(tools["list_files"].invoke({}).splitlines()) + assert listing == {"a.sol", "b.sol", "shared.sol"} + + async def test_list_empty_when_no_backends(self): + tools, _ = _tools_by_name([]) + assert tools["list_files"].invoke({}) == "" + + async def test_grep_finds_across_layers(self): + b1 = InMemoryBackend({"a.sol": "contract Alpha {}"}) + b2 = InMemoryBackend({"b.sol": "contract Beta {}"}) + tools, _ = _tools_by_name([b1, b2]) + resp = tools["grep_files"].invoke({ + "search_string": "contract", + "matching_lines": False, + }) + assert "a.sol" in resp + assert "b.sol" in resp + + async def test_grep_uses_first_hit_content(self): + """When a path appears in multiple layers, grep should search the + content the ``get_file`` tool would return — i.e. the highest + layer's copy.""" + high = InMemoryBackend({"shared.sol": "MATCH"}) + low = InMemoryBackend({"shared.sol": "no_hit"}) + tools, _ = _tools_by_name([high, low]) + resp = tools["grep_files"].invoke({ + "search_string": "MATCH", + "matching_lines": False, + }) + assert "shared.sol" in resp + + async def test_grep_match_in_filters(self): + b = InMemoryBackend({ + "a.sol": "contract A {}", + "b.sol": "contract B {}", + }) + tools, _ = _tools_by_name([b]) + resp = tools["grep_files"].invoke({ + "search_string": "contract", + "matching_lines": False, + "match_in": ["a.sol"], + }) + assert "a.sol" in resp + assert "b.sol" not in resp + + async def test_grep_matching_lines_includes_line_number(self): + b = InMemoryBackend({"x.sol": "line1\ncontract X {}\nline3"}) + tools, _ = _tools_by_name([b]) + resp = tools["grep_files"].invoke({ + "search_string": "contract", + "matching_lines": True, + }) + assert "x.sol:2:" in resp + + +# --------------------------------------------------------------------------- +# fs_tools_layered — forbidden_read filtering +# --------------------------------------------------------------------------- + + +class TestForbiddenRead: + pytestmark = pytest.mark.asyncio + + async def test_forbidden_read_blocks_get(self): + b = InMemoryBackend({"secrets/key.txt": "supersecret"}) + tools, _ = _tools_by_name([b], forbidden_read=r"^secrets/.*") + resp = tools["get_file"].invoke({"path": "secrets/key.txt"}) + assert "supersecret" not in resp + assert "not found" in resp.lower() + + async def test_forbidden_read_filters_list(self): + b = InMemoryBackend({ + "a.sol": "ok", + "secrets/key.txt": "supersecret", + }) + tools, _ = _tools_by_name([b], forbidden_read=r"^secrets/.*") + listing = tools["list_files"].invoke({}).splitlines() + assert "a.sol" in listing + assert "secrets/key.txt" not in listing + + async def test_forbidden_read_filters_grep(self): + b = InMemoryBackend({ + "a.sol": "hello", + "secrets/key.txt": "hello supersecret", + }) + tools, _ = _tools_by_name([b], forbidden_read=r"^secrets/.*") + resp = tools["grep_files"].invoke({ + "search_string": "hello", + "matching_lines": False, + }) + assert "a.sol" in resp + assert "secrets" not in resp + + async def test_forbidden_read_does_not_affect_materializer(self, tmp_path: pathlib.Path): + """Materialization dumps EVERY file the backend serves; the + forbidden_read filter only governs the tool-visible view. This is + intentional so downstream consumers (e.g. solc) see a complete + source tree.""" + b = InMemoryBackend({ + "a.sol": "visible", + "secrets/key.txt": "supersecret", + }) + _, mat = fs_tools_layered([b], forbidden_read=r"^secrets/.*") + await mat.dump_to(tmp_path) + assert (tmp_path / "a.sol").read_text() == "visible" + assert (tmp_path / "secrets/key.txt").read_text() == "supersecret" + + +# --------------------------------------------------------------------------- +# End-to-end: tool reads match materialized disk state +# --------------------------------------------------------------------------- + + +class TestMaterializerRoundtrip: + pytestmark = pytest.mark.asyncio + + async def test_tool_view_matches_dumped_tree(self, tmp_path: pathlib.Path): + high = InMemoryBackend({"shared.sol": "from_high"}) + low = InMemoryBackend({ + "shared.sol": "from_low", + "only_low.sol": "low_only", + }) + tools, mat = _tools_by_name([high, low]) + + # Tool surface: high wins on collision, unique paths visible. + assert tools["get_file"].invoke({"path": "shared.sol"}) == "from_high" + assert tools["get_file"].invoke({"path": "only_low.sol"}) == "low_only" + + # Materialized disk state mirrors the tool surface. + out = tmp_path / "out" + await mat.dump_to(out) + assert (out / "shared.sol").read_text() == "from_high" + assert (out / "only_low.sol").read_text() == "low_only" + + async def test_dirbackend_participates_in_layering(self, tmp_path: pathlib.Path): + """Ensure the real on-disk backend layers correctly alongside the + in-memory one.""" + on_disk_root = tmp_path / "fs" + _write_tree(on_disk_root, {"shared.sol": "from_disk", "disk_only.sol": "D"}) + disk = DirBackend(on_disk_root) + mem = InMemoryBackend({"shared.sol": "from_mem", "mem_only.sol": "M"}) + + # Mem first — should win on collision. + tools, mat = _tools_by_name([mem, disk]) + assert tools["get_file"].invoke({"path": "shared.sol"}) == "from_mem" + assert tools["get_file"].invoke({"path": "disk_only.sol"}) == "D" + assert tools["get_file"].invoke({"path": "mem_only.sol"}) == "M" + + out = tmp_path / "out" + await mat.dump_to(out) + assert (out / "shared.sol").read_text() == "from_mem" + assert (out / "disk_only.sol").read_text() == "D" + assert (out / "mem_only.sol").read_text() == "M" + + +# --------------------------------------------------------------------------- +# global_exclude: invisible-to-every-consumer pattern +# --------------------------------------------------------------------------- + + +class TestGlobalExcludePredicate: + """Direct tests of ``_make_global_include_pred`` — the predicate + factory all the higher layers compose. Polarity: True = okay to + access (consistent with ``_make_checker``).""" + + # The module-level ``pytestmark = pytest.mark.asyncio`` (line 24) + # applies asyncio handling to every test by default; these are + # synchronous predicate tests, so opt out at the class level to + # silence the "marked asyncio but not async" warning. + pytestmark: list = [] + + def test_default_floor_excludes_dot_git(self): + pred = _make_global_include_pred(None) + # .git directory and contents always excluded. + assert not pred(".git") + assert not pred(".git/HEAD") + assert not pred(".git/refs/heads/main") + assert not pred("submodule/.git") + assert not pred("submodule/.git/config") + # Things that should NOT match the .git filter: + assert pred("src/Foo.sol") + assert pred(".gitignore") # not a .git path-segment + assert pred("code.git.txt") # name happens to contain .git + assert pred("notgit/file") + + def test_regex_form_unions_with_floor(self): + # Exclude PDFs in addition to the .git floor. + pred = _make_global_include_pred(r".*\.pdf") + assert not pred(".git/HEAD") # floor still active + assert not pred("docs/spec.pdf") + assert not pred("spec.pdf") + assert pred("src/Foo.sol") + + def test_callable_form_true_means_exclude(self): + # Polarity at the user-facing API: True = exclude. + pred = _make_global_include_pred(lambda p: p.suffix in {".pdf", ".jpg"}) + assert not pred("docs/spec.pdf") + assert not pred("img/photo.jpg") + assert not pred(".git/HEAD") # floor still active + assert pred("src/Foo.sol") + assert pred("docs/README.md") + + def test_callable_form_receives_purepath(self): + # The callable should receive a PurePath, so path semantics like + # .parts and .suffix are usable without manual parsing. + seen: list[object] = [] + def _pred(p) -> bool: + seen.append(p) + return False # never exclude + pred = _make_global_include_pred(_pred) + pred("src/sub/Foo.sol") + assert len(seen) == 1 + assert isinstance(seen[0], pathlib.PurePath) + assert seen[0].suffix == ".sol" + assert seen[0].parts == ("src", "sub", "Foo.sol") + + def test_callable_form_returning_false_allows(self): + pred = _make_global_include_pred(lambda p: False) # never exclude + assert pred("anything") + assert pred("docs/spec.pdf") + assert not pred(".git/HEAD") # floor still active + + def test_none_form_only_floor(self): + pred = _make_global_include_pred(None) + assert pred("anything.txt") + assert pred("src/very/deep/nested/Foo.sol") + assert not pred(".git/HEAD") + + +class TestGlobalExclude: + """Integration tests: ``global_exclude`` propagates through the tool + surface, the materializer, and the unfiltered lookup hook.""" + + pytestmark = pytest.mark.asyncio + + async def test_floor_filters_tool_get(self): + b = InMemoryBackend({ + "src/Foo.sol": "contract Foo {}", + ".git/HEAD": "ref: refs/heads/main", + }) + tools, _ = _tools_by_name([b]) # no global_exclude — floor only + assert "contract Foo" in tools["get_file"].invoke({"path": "src/Foo.sol"}) + resp = tools["get_file"].invoke({"path": ".git/HEAD"}) + assert "ref:" not in resp + assert "not found" in resp.lower() + + async def test_floor_filters_tool_list(self): + b = InMemoryBackend({ + "src/Foo.sol": "x", + ".git/HEAD": "y", + ".git/config": "z", + }) + tools, _ = _tools_by_name([b]) + listing = tools["list_files"].invoke({}).splitlines() + assert "src/Foo.sol" in listing + assert ".git/HEAD" not in listing + assert ".git/config" not in listing + + async def test_floor_filters_materializer(self, tmp_path: pathlib.Path): + """Distinct from ``forbidden_read`` — global_exclude (including + the .git floor) DOES affect materialization.""" + b = InMemoryBackend({ + "src/Foo.sol": "ok", + ".git/HEAD": "should not be dumped", + }) + _, mat = fs_tools_layered([b]) + await mat.dump_to(tmp_path) + assert (tmp_path / "src/Foo.sol").read_text() == "ok" + assert not (tmp_path / ".git").exists() + assert not (tmp_path / ".git/HEAD").exists() + + async def test_user_regex_filters_tool_surface(self): + b = InMemoryBackend({ + "src/Foo.sol": "code", + "docs/spec.pdf": "binary blob", + "img/photo.jpg": "another blob", + }) + tools, _ = _tools_by_name([b], global_exclude=r".*\.(pdf|jpg)") + listing = tools["list_files"].invoke({}).splitlines() + assert "src/Foo.sol" in listing + assert "docs/spec.pdf" not in listing + assert "img/photo.jpg" not in listing + + async def test_user_callable_filters_tool_surface(self): + b = InMemoryBackend({ + "src/Foo.sol": "code", + "docs/spec.pdf": "blob", + "src/Bar.sol": "code", + }) + tools, _ = _tools_by_name([b], global_exclude=lambda p: p.suffix == ".pdf") + listing = tools["list_files"].invoke({}).splitlines() + assert "src/Foo.sol" in listing + assert "src/Bar.sol" in listing + assert "docs/spec.pdf" not in listing + + async def test_user_pattern_filters_materializer(self, tmp_path: pathlib.Path): + b = InMemoryBackend({ + "src/Foo.sol": "code", + "docs/spec.pdf": "blob", + }) + _, mat = fs_tools_layered([b], global_exclude=lambda p: p.suffix == ".pdf") + await mat.dump_to(tmp_path) + assert (tmp_path / "src/Foo.sol").exists() + assert not (tmp_path / "docs/spec.pdf").exists() + + async def test_floor_and_user_pattern_compose(self, tmp_path: pathlib.Path): + """Both .git AND the user pattern are excluded — the floor + composes with the user-supplied predicate, not replaced by it.""" + b = InMemoryBackend({ + "src/Foo.sol": "code", + ".git/HEAD": "ref", + "docs/spec.pdf": "blob", + }) + _, mat = fs_tools_layered([b], global_exclude=lambda p: p.suffix == ".pdf") + await mat.dump_to(tmp_path) + assert (tmp_path / "src/Foo.sol").exists() + assert not (tmp_path / ".git").exists() + assert not (tmp_path / "docs/spec.pdf").exists() + + async def test_globally_excluded_path_invisible_to_get_via_materializer(self): + """The materializer's ``get`` (used by FileRegistry's existence + check, etc.) must respect global_exclude — distinct from + ``forbidden_read`` which does not affect the materializer.""" + b = InMemoryBackend({"src/Foo.sol": "code", ".git/HEAD": "ref"}) + _, mat = fs_tools_layered([b]) + assert mat.get("src/Foo.sol") == "code" + assert mat.get(".git/HEAD") is None + + async def test_dirbackend_dump_skips_excluded_subtree(self, tmp_path: pathlib.Path): + """``DirBackend.dump_to`` uses ``copytree(ignore=...)`` with + relativized paths so ``.git/`` is short-circuited at the root — + not recursed into and filtered file-by-file. Verifies the + relativization is correct for both root-level skips and + deeply-nested file matches.""" + src = tmp_path / "src" + _write_tree(src, { + "Foo.sol": "code", + ".git/HEAD": "ref", + ".git/objects/abc/def": "blob", + "docs/spec.pdf": "binary", + "subdir/Bar.sol": "more code", + }) + out = tmp_path / "out" + backend = DirBackend(src) + # Exclude .pdf via callable; .git via the floor. + mat = _LayeredMaterializer( + [backend], global_exclude=lambda p: p.suffix == ".pdf", + ) + await mat.dump_to(out) + assert (out / "Foo.sol").exists() + assert (out / "subdir/Bar.sol").exists() + assert not (out / ".git").exists() + assert not (out / ".git/HEAD").exists() + assert not (out / ".git/objects/abc/def").exists() + assert not (out / "docs/spec.pdf").exists() + + async def test_grep_excludes_globally_excluded(self): + """grep_files must not return matches from globally-excluded files.""" + b = InMemoryBackend({ + "src/Foo.sol": "needle", + ".git/HEAD": "needle in git", + }) + tools, _ = _tools_by_name([b]) + resp = tools["grep_files"].invoke({ + "search_string": "needle", + "matching_lines": False, + }) + assert "src/Foo.sol" in resp + assert ".git" not in resp + + async def test_global_exclude_layers_with_forbidden_read(self, tmp_path: pathlib.Path): + """``forbidden_read`` (tool-only) and ``global_exclude`` + (everywhere) compose: forbidden_read narrows the tool view but + not materialization; global_exclude narrows both.""" + b = InMemoryBackend({ + "src/Foo.sol": "ok", + "secrets/key.txt": "supersecret", # tool-blind only + "docs/spec.pdf": "blob", # globally excluded + }) + tools, mat = _tools_by_name( + [b], + forbidden_read=r"^secrets/.*", + global_exclude=lambda p: p.suffix == ".pdf", + ) + listing = tools["list_files"].invoke({}).splitlines() + assert "src/Foo.sol" in listing + assert "secrets/key.txt" not in listing # forbidden_read blocks + assert "docs/spec.pdf" not in listing # global_exclude blocks + + # Materializer: forbidden_read does NOT filter; global_exclude DOES. + await mat.dump_to(tmp_path) + assert (tmp_path / "src/Foo.sol").exists() + assert (tmp_path / "secrets/key.txt").exists() # forbidden_read tool-only + assert not (tmp_path / "docs/spec.pdf").exists() # global_exclude everywhere diff --git a/tests/test_memory.py b/tests/test_memory.py index 62c5124..3b8dd0f 100644 --- a/tests/test_memory.py +++ b/tests/test_memory.py @@ -213,6 +213,12 @@ def test_rename_destination_exists_returns_error(self, backend: AnyBackend) -> N class TestPathValidation: + """The backend layer raises ``MemoryBackendError`` for path-validation + failures; the memory *tool* wrapper is the layer that catches and + transforms those into LLM-facing ``Error: ...`` strings (see + ``memory_tool`` / ``async_memory_tool``). These tests exercise the + backend directly and therefore assert the raise behaviour.""" + def test_create_outside_memories_raises(self, backend: AnyBackend) -> None: with pytest.raises(MemoryBackendError, match="/memories"): backend.create("/tmp/evil.txt", "bad") @@ -225,10 +231,6 @@ def test_rename_source_outside_memories_raises(self, backend: AnyBackend) -> Non with pytest.raises(MemoryBackendError): backend.rename("/etc/passwd", "/memories/stolen.txt") - def test_path_traversal_rejected(self, backend: AnyBackend) -> None: - with pytest.raises(MemoryBackendError): - backend.create("/memories/../etc/shadow", "evil") - # --------------------------------------------------------------------------- # Namespace copying (init_from) diff --git a/tests/test_memory_async.py b/tests/test_memory_async.py index 86ac2ba..5642d06 100644 --- a/tests/test_memory_async.py +++ b/tests/test_memory_async.py @@ -248,6 +248,12 @@ async def test_rename_destination_exists_returns_error( class TestAsyncPathValidation: + """The backend layer raises ``MemoryBackendError`` for path-validation + failures; the memory *tool* wrapper is the layer that catches and + transforms those into LLM-facing ``Error: ...`` strings (see + ``memory_tool`` / ``async_memory_tool``). These tests exercise the + backend directly and therefore assert the raise behaviour.""" + async def test_create_outside_memories_raises( self, async_backend: AnyAsyncBackend ) -> None: @@ -266,12 +272,6 @@ async def test_rename_source_outside_memories_raises( with pytest.raises(MemoryBackendError): await async_backend.rename("/etc/passwd", "/memories/stolen.txt") - async def test_path_traversal_rejected( - self, async_backend: AnyAsyncBackend - ) -> None: - with pytest.raises(MemoryBackendError): - await async_backend.create("/memories/../etc/shadow", "evil") - # --------------------------------------------------------------------------- # Namespace copying (init_from) diff --git a/uv.lock b/uv.lock index 8be49b8..72744a9 100644 --- a/uv.lock +++ b/uv.lock @@ -193,7 +193,7 @@ requires-dist = [ { name = "langchain", specifier = ">=1.2" }, { name = "langchain-anthropic", specifier = ">=1.4" }, { name = "langchain-core", specifier = ">=1.2" }, - { name = "langgraph", specifier = "==1.1.0" }, + { name = "langgraph", specifier = "==1.0.5" }, { name = "psycopg", extras = ["binary"], specifier = ">=3.2" }, { name = "psycopg", extras = ["pool"], specifier = ">=3.2" }, ] @@ -352,16 +352,16 @@ wheels = [ [[package]] name = "langchain" -version = "1.2.11" +version = "1.2.6" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "langchain-core" }, { name = "langgraph" }, { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/8a/ed/990ec469ee2c4876adc367a1bb10143132a54a6fd9757269d2e440d65b37/langchain-1.2.11.tar.gz", hash = "sha256:1b0f680de88c178898a69f9814025729110fc68365b2c33cd2ba978114d5fc0a", size = 566207, upload-time = "2026-03-10T19:46:14.519Z" } +sdist = { url = "https://files.pythonhosted.org/packages/f5/bc/d8f506a525baadee99a65c6cc28c1c35c9eaf1cb2009f048e9861d81a600/langchain-1.2.6.tar.gz", hash = "sha256:7d46cbf719d860a16f6fc182d5d3de17453dda187f3d43e9c40ac352a5094fdd", size = 553127, upload-time = "2026-01-16T19:21:19.611Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/10/24/c9fd2d6f31ef7444e163aa9295c2c661892c8b2b683b4b6dd190ffd29a2f/langchain-1.2.11-py3-none-any.whl", hash = "sha256:ccc5d23e2568efa6e3cb2dde268a267d7f090bdad47d7e1ee5f0c9769e8cb7b9", size = 112136, upload-time = "2026-03-10T19:46:12.433Z" }, + { url = "https://files.pythonhosted.org/packages/3f/28/d5dc4cb06ccb29d62a590d446072964766555e85863f5044c6e644c07d0d/langchain-1.2.6-py3-none-any.whl", hash = "sha256:a9a6c39f03c09b6eb0f1b47e267ad2a2fd04e124dfaa9753bd6c11d2fe7d944e", size = 108458, upload-time = "2026-01-16T19:21:18.085Z" }, ] [[package]] @@ -399,7 +399,7 @@ wheels = [ [[package]] name = "langgraph" -version = "1.1.0" +version = "1.0.5" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "langchain-core" }, @@ -409,9 +409,9 @@ dependencies = [ { name = "pydantic" }, { name = "xxhash" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/c6/de/be274968aa1ce40a109e0e638718d4e31872e97dbc84d3b5c7ab81a18d4e/langgraph-1.1.0.tar.gz", hash = "sha256:2decaef5d6716166dc5c13e0fdf65637e5a1837ef4c94fd82b6bcf2115cb5c78", size = 542647, upload-time = "2026-03-10T12:46:34.156Z" } +sdist = { url = "https://files.pythonhosted.org/packages/7d/47/28f4d4d33d88f69de26f7a54065961ac0c662cec2479b36a2db081ef5cb6/langgraph-1.0.5.tar.gz", hash = "sha256:7f6ae59622386b60fe9fa0ad4c53f42016b668455ed604329e7dc7904adbf3f8", size = 493969, upload-time = "2025-12-12T23:05:48.224Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/61/0c/4920a1c5d2d1059f706b547019091e4da74ab30cd95fe114af737a89236f/langgraph-1.1.0-py3-none-any.whl", hash = "sha256:7d29e01312340c9c7c09eb0178db5edd0514c3f35929fa5b32a247fcd9a9cd65", size = 167249, upload-time = "2026-03-10T12:46:32.546Z" }, + { url = "https://files.pythonhosted.org/packages/23/1b/e318ee76e42d28f515d87356ac5bd7a7acc8bad3b8f54ee377bef62e1cbf/langgraph-1.0.5-py3-none-any.whl", hash = "sha256:b4cfd173dca3c389735b47228ad8b295e6f7b3df779aba3a1e0c23871f81281e", size = 157056, upload-time = "2025-12-12T23:05:46.499Z" }, ] [[package]]