Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 94 additions & 7 deletions core/services/ingestion_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
from core.utils.fast_ops import bytes_to_data_uri, encode_base64
from core.utils.folder_utils import normalize_folder_path, normalize_ingest_folder_inputs
from core.utils.storage_usage import extract_storage_bytes
from core.utils.typed_metadata import MetadataBundle, merge_metadata, normalize_metadata
from core.utils.typed_metadata import MetadataBundle, canonicalize_type_name, merge_metadata, normalize_metadata
from core.vector_store.base_vector_store import BaseVectorStore

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -82,6 +82,10 @@ class IngestionService:
"owner_id",
"end_user_id",
}
_FOLDER_PATH_UPDATE_ERROR = (
"folder_path is managed by Morphik and cannot be changed using the update metadata endpoint. "
"Use the folder endpoints to move the document or folder instead."
)

def __init__(
self,
Expand Down Expand Up @@ -123,18 +127,46 @@ def _enforce_no_user_mutable_fields(
extra_fields: Optional[Dict[str, Any]] = None,
metadata_types: Optional[Dict[str, Any]] = None,
context: str = "ingest",
existing_doc: Optional[Document] = None,
allow_unchanged_metadata: bool = False,
) -> None:
"""Prevent users from setting reserved system fields directly."""
if self._has_folder_path_field(metadata, extra_fields, metadata_types):
if context == "update":
raise ValueError(self._FOLDER_PATH_UPDATE_ERROR)
raise ValueError(
f"folder_path is managed by Morphik and cannot be set directly during {context}. "
"Use folder parameters or folder endpoints to manage document placement."
)

invalid_fields = set()

if isinstance(metadata, dict):
invalid_fields.update({key for key in metadata.keys() if key in self._USER_IMMUTABLE_FIELDS})
for key, value in metadata.items():
if key not in self._USER_IMMUTABLE_FIELDS:
continue
if (
allow_unchanged_metadata
and existing_doc is not None
and self._is_unchanged_managed_metadata_value(existing_doc, key, value)
):
continue
invalid_fields.add(key)

if isinstance(extra_fields, dict):
invalid_fields.update({key for key in extra_fields.keys() if key in self._USER_IMMUTABLE_FIELDS})

if isinstance(metadata_types, dict):
invalid_fields.update({key for key in metadata_types.keys() if key in self._USER_IMMUTABLE_FIELDS})
for key, value in metadata_types.items():
if key not in self._USER_IMMUTABLE_FIELDS:
continue
if (
allow_unchanged_metadata
and existing_doc is not None
and self._is_unchanged_managed_metadata_type(existing_doc, key, value)
):
continue
invalid_fields.add(key)

if invalid_fields:
fields_str = ", ".join(sorted(invalid_fields))
Expand All @@ -143,6 +175,44 @@ def _enforce_no_user_mutable_fields(
"Remove them from the request."
)

def _current_managed_metadata_values(self, doc: Document) -> Dict[str, Any]:
"""Return managed metadata values as currently exposed on a document."""
current_values = dict(doc.metadata or {})
current_values.setdefault("external_id", doc.external_id)

folder_metadata_value = doc.folder_path or doc.folder_name
if folder_metadata_value is not None:
current_values.setdefault("folder_name", folder_metadata_value)

if doc.folder_id is not None:
current_values.setdefault("folder_id", doc.folder_id)

return current_values

@staticmethod
def _has_folder_path_field(*payloads: Optional[Dict[str, Any]]) -> bool:
return any(isinstance(payload, dict) and "folder_path" in payload for payload in payloads)

def _is_unchanged_managed_metadata_value(self, doc: Document, key: str, value: Any) -> bool:
current_values = self._current_managed_metadata_values(doc)
return key in current_values and current_values[key] == value

def _is_unchanged_managed_metadata_type(self, doc: Document, key: str, value: Any) -> bool:
current_types = dict(doc.metadata_types or {})
if doc.external_id:
current_types.setdefault("external_id", "string")
for metadata_key, metadata_value in self._current_managed_metadata_values(doc).items():
if isinstance(metadata_value, str):
current_types.setdefault(metadata_key, "string")

if key not in current_types:
return False

try:
return canonicalize_type_name(str(value)) == canonicalize_type_name(str(current_types[key]))
except ValueError:
return value == current_types[key]

@classmethod
def _clean_system_metadata(cls, metadata: Optional[Dict[str, Any]]) -> Dict[str, Any]:
"""Remove scope fields that are persisted in dedicated columns."""
Expand Down Expand Up @@ -637,14 +707,22 @@ async def queue_document_update(
"""
Update a document by replacing its content and re-queueing ingestion.
"""
self._enforce_no_user_mutable_fields(metadata, metadata_types=metadata_types, context="update")
metadata_only_update = content is None and file is None and metadata is not None
if not metadata_only_update:
self._enforce_no_user_mutable_fields(metadata, metadata_types=metadata_types, context="update")

doc = await self._validate_update_access(document_id, auth)
if not doc:
return None

metadata_only_update = content is None and file is None and metadata is not None
if metadata_only_update:
self._enforce_no_user_mutable_fields(
metadata,
metadata_types=metadata_types,
context="update",
existing_doc=doc,
allow_unchanged_metadata=True,
)
metadata_bundle = self._update_metadata(doc, metadata, metadata_types, None)
return await self._update_document_metadata_only(doc, auth, metadata_bundle)

Expand Down Expand Up @@ -788,15 +866,24 @@ async def update_document(
Returns:
Updated document if successful, None if failed
"""
metadata_only_update = content is None and file is None and metadata is not None
# Prevent callers from modifying reserved fields
self._enforce_no_user_mutable_fields(metadata, metadata_types=metadata_types, context="update")
if not metadata_only_update:
self._enforce_no_user_mutable_fields(metadata, metadata_types=metadata_types, context="update")

# Validate permissions and get document
doc = await self._validate_update_access(document_id, auth)
if not doc:
return None

metadata_only_update = content is None and file is None and metadata is not None
if metadata_only_update:
self._enforce_no_user_mutable_fields(
metadata,
metadata_types=metadata_types,
context="update",
existing_doc=doc,
allow_unchanged_metadata=True,
)

# Process content based on update type
updated_content = None
Expand Down
188 changes: 188 additions & 0 deletions core/tests/unit/test_ingestion_service_metadata_update.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
"""Unit tests for ingestion metadata-only update validation."""

import os
import sys
from pathlib import Path
from types import ModuleType

import pytest

from core.models.auth import AuthContext
from core.models.documents import Document

os.environ.setdefault("POSTGRES_URI", "postgresql://user:pass@localhost:5432/test")

# Avoid importing optional embedding backends through core.embedding.__init__.
_embedding_package = sys.modules.get("core.embedding")
if _embedding_package is None:
_embedding_stub = ModuleType("core.embedding")
_embedding_stub.__path__ = [str(Path(__file__).resolve().parents[2] / "embedding")]
sys.modules["core.embedding"] = _embedding_stub

from core.services.ingestion_service import IngestionService # noqa: E402

if _embedding_package is None:
sys.modules.pop("core.embedding", None)


class FakeDatabase:
def __init__(self, doc: Document):
self.doc = doc
self.update_calls = []

async def get_document(self, document_id: str, auth: AuthContext):
if document_id == self.doc.external_id:
return self.doc
return None

async def check_access(self, document_id: str, auth: AuthContext, required_permission: str = "read") -> bool:
return document_id == self.doc.external_id

async def update_document(self, document_id: str, updates, auth: AuthContext, metadata_bundle=None):
self.update_calls.append(
{
"document_id": document_id,
"updates": updates,
"auth": auth,
"metadata_bundle": metadata_bundle,
}
)
return True


def _auth() -> AuthContext:
return AuthContext(user_id="user-1", app_id="app-1")


def _document() -> Document:
return Document(
external_id="doc-1",
content_type="text/plain",
filename="report.txt",
metadata={
"external_id": "doc-1",
"folder_name": "/Team/Reports",
"folder_id": "folder-1",
"custom": "old",
},
metadata_types={
"external_id": "string",
"folder_name": "string",
"folder_id": "string",
"custom": "string",
},
folder_name="Reports",
folder_path="/Team/Reports",
folder_id="folder-1",
app_id="app-1",
)


def _service(doc: Document):
db = FakeDatabase(doc)
return IngestionService(db, None, None, None, None), db


@pytest.mark.asyncio
async def test_metadata_only_update_allows_unchanged_managed_metadata_fields():
doc = _document()
service, db = _service(doc)

updated = await service.update_document(
document_id="doc-1",
auth=_auth(),
metadata={
"external_id": "doc-1",
"folder_name": "/Team/Reports",
"folder_id": "folder-1",
"custom": "new",
},
metadata_types={
"external_id": "string",
"folder_name": "string",
"folder_id": "string",
"custom": "string",
},
)

assert updated is doc
assert doc.metadata["custom"] == "new"
assert len(db.update_calls) == 1
assert db.update_calls[0]["updates"]["metadata"]["external_id"] == "doc-1"
assert db.update_calls[0]["updates"]["metadata"]["folder_name"] == "/Team/Reports"


@pytest.mark.asyncio
async def test_metadata_only_update_rejects_folder_path_with_folder_endpoint_message():
doc = _document()
service, db = _service(doc)

with pytest.raises(ValueError, match="folder_path.*update metadata endpoint.*folder"):
await service.update_document(
document_id="doc-1",
auth=_auth(),
metadata={
"folder_path": "/Team/Reports",
"custom": "new",
},
)

assert "folder_path" not in doc.metadata
assert doc.metadata["custom"] == "old"
assert db.update_calls == []


@pytest.mark.asyncio
async def test_metadata_only_update_rejects_changed_managed_metadata_fields():
doc = _document()
service, db = _service(doc)

with pytest.raises(ValueError, match="folder_name"):
await service.update_document(
document_id="doc-1",
auth=_auth(),
metadata={
"folder_name": "/Team/Other",
"custom": "new",
},
)

assert doc.metadata["custom"] == "old"
assert db.update_calls == []


@pytest.mark.asyncio
async def test_content_update_still_rejects_unchanged_managed_metadata_fields():
doc = _document()
service, db = _service(doc)

with pytest.raises(ValueError, match="external_id"):
await service.update_document(
document_id="doc-1",
auth=_auth(),
content="replacement",
metadata={"external_id": "doc-1"},
)

assert db.update_calls == []


@pytest.mark.asyncio
async def test_queued_metadata_only_update_allows_unchanged_managed_metadata_fields():
doc = _document()
service, db = _service(doc)

updated = await service.queue_document_update(
document_id="doc-1",
auth=_auth(),
redis=None,
metadata={
"external_id": "doc-1",
"folder_name": "/Team/Reports",
"custom": "queued",
},
)

assert updated is doc
assert doc.metadata["custom"] == "queued"
assert len(db.update_calls) == 1
Loading