Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 24 additions & 9 deletions src/basic_memory/repository/search_repository_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -451,21 +451,36 @@ def _compose_row_source_text(self, row) -> str:
return "\n\n".join(part for part in row_parts if part)

def _build_chunk_records(self, rows) -> list[dict[str, str]]:
records: list[dict[str, str]] = []
records_by_key: dict[str, dict[str, str]] = {}
duplicate_chunk_keys = 0
for row in rows:
source_text = self._compose_row_source_text(row)
chunks = self._split_text_into_chunks(source_text)
for chunk_index, chunk_text in enumerate(chunks):
chunk_key = f"{row.type}:{row.id}:{chunk_index}"
source_hash = hashlib.sha256(chunk_text.encode("utf-8")).hexdigest()
records.append(
{
"chunk_key": chunk_key,
"chunk_text": chunk_text,
"source_hash": source_hash,
}
)
return records
# Trigger: SQLite FTS5 can accumulate duplicate logical rows for the
# same search_index id because it does not enforce relational uniqueness.
# Why: duplicate chunk keys would schedule duplicate writes for the same
# chunk row and eventually trip UNIQUE(rowid) in search_vector_embeddings.
# Outcome: collapse chunk work to one deterministic record per chunk key.
if chunk_key in records_by_key:
duplicate_chunk_keys += 1
records_by_key[chunk_key] = {
"chunk_key": chunk_key,
"chunk_text": chunk_text,
"source_hash": source_hash,
}

if duplicate_chunk_keys:
logger.warning(
"Collapsed duplicate vector chunk keys before embedding sync: "
"project_id={project_id} duplicate_chunk_keys={duplicate_chunk_keys}",
project_id=self.project_id,
duplicate_chunk_keys=duplicate_chunk_keys,
)

return list(records_by_key.values())

# --- Text splitting ---

Expand Down
114 changes: 70 additions & 44 deletions src/basic_memory/services/project_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

from loguru import logger
from sqlalchemy import text
from sqlalchemy.exc import OperationalError as SAOperationalError

from basic_memory.models import Project
from basic_memory.repository.project_repository import ProjectRepository
Expand Down Expand Up @@ -1004,56 +1005,81 @@ async def get_embedding_status(self, project_id: int) -> EmbeddingStatus:
)
total_indexed_entities = si_result.scalar() or 0

chunks_result = await self.repository.execute_query(
text("SELECT COUNT(*) FROM search_vector_chunks WHERE project_id = :project_id"),
{"project_id": project_id},
)
total_chunks = chunks_result.scalar() or 0

entities_with_chunks_result = await self.repository.execute_query(
text(
"SELECT COUNT(DISTINCT entity_id) FROM search_vector_chunks "
"WHERE project_id = :project_id"
),
{"project_id": project_id},
)
total_entities_with_chunks = entities_with_chunks_result.scalar() or 0

# Embeddings count — join pattern differs between SQLite and Postgres
if is_postgres:
embeddings_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"JOIN search_vector_embeddings e ON e.chunk_id = c.id "
"WHERE c.project_id = :project_id"
)
else:
embeddings_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"JOIN search_vector_embeddings e ON e.rowid = c.id "
"WHERE c.project_id = :project_id"
try:
chunks_result = await self.repository.execute_query(
text("SELECT COUNT(*) FROM search_vector_chunks WHERE project_id = :project_id"),
{"project_id": project_id},
)
total_chunks = chunks_result.scalar() or 0

embeddings_result = await self.repository.execute_query(
embeddings_sql, {"project_id": project_id}
)
total_embeddings = embeddings_result.scalar() or 0
entities_with_chunks_result = await self.repository.execute_query(
text(
"SELECT COUNT(DISTINCT entity_id) FROM search_vector_chunks "
"WHERE project_id = :project_id"
),
{"project_id": project_id},
)
total_entities_with_chunks = entities_with_chunks_result.scalar() or 0

# Embeddings count — join pattern differs between SQLite and Postgres
if is_postgres:
embeddings_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"JOIN search_vector_embeddings e ON e.chunk_id = c.id "
"WHERE c.project_id = :project_id"
)
else:
embeddings_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"JOIN search_vector_embeddings e ON e.rowid = c.id "
"WHERE c.project_id = :project_id"
)

# Orphaned chunks (chunks without embeddings — indicates interrupted indexing)
if is_postgres:
orphan_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"LEFT JOIN search_vector_embeddings e ON e.chunk_id = c.id "
"WHERE c.project_id = :project_id AND e.chunk_id IS NULL"
embeddings_result = await self.repository.execute_query(
embeddings_sql, {"project_id": project_id}
)
else:
orphan_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"LEFT JOIN search_vector_embeddings e ON e.rowid = c.id "
"WHERE c.project_id = :project_id AND e.rowid IS NULL"
total_embeddings = embeddings_result.scalar() or 0

# Orphaned chunks (chunks without embeddings — indicates interrupted indexing)
if is_postgres:
orphan_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"LEFT JOIN search_vector_embeddings e ON e.chunk_id = c.id "
"WHERE c.project_id = :project_id AND e.chunk_id IS NULL"
)
else:
orphan_sql = text(
"SELECT COUNT(*) FROM search_vector_chunks c "
"LEFT JOIN search_vector_embeddings e ON e.rowid = c.id "
"WHERE c.project_id = :project_id AND e.rowid IS NULL"
)

orphan_result = await self.repository.execute_query(
orphan_sql, {"project_id": project_id}
)
orphaned_chunks = orphan_result.scalar() or 0
except SAOperationalError as exc:
# Trigger: sqlite_master can list vec0 virtual tables even when sqlite-vec
# is not loaded in the current Python runtime.
# Why: project info should degrade gracefully instead of crashing on stats queries.
# Outcome: report vector tables as unavailable and point the user to install the
# missing dependency before rebuilding embeddings.
if is_postgres or "no such module: vec0" not in str(exc).lower():
raise

orphan_result = await self.repository.execute_query(orphan_sql, {"project_id": project_id})
orphaned_chunks = orphan_result.scalar() or 0
return EmbeddingStatus(
semantic_search_enabled=True,
embedding_provider=provider,
embedding_model=model,
embedding_dimensions=dimensions,
total_indexed_entities=total_indexed_entities,
vector_tables_exist=False,
reindex_recommended=True,
reindex_reason=(
"SQLite vector tables exist but sqlite-vec is unavailable in this Python "
"environment — install/update basic-memory, then run: bm reindex --embeddings"
),
)

# --- Reindex recommendation logic (priority order) ---
reindex_recommended = False
Expand Down
33 changes: 31 additions & 2 deletions src/basic_memory/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ def __str__(self) -> str: ...
# In type annotations, use Union[Path, str] instead of FilePath for now
# This preserves compatibility with existing code while we migrate
FilePath = Union[Path, str]
WINDOWS_LOG_FILE_RETENTION = 5


def generate_permalink(file_path: Union[Path, str, PathLike], split_extension: bool = True) -> str:
Expand Down Expand Up @@ -250,7 +251,7 @@ def setup_logging(
log_to_file: bool = False,
log_to_stdout: bool = False,
structured_context: bool = False,
) -> None: # pragma: no cover
) -> None:
"""Configure logging with explicit settings.

This function provides a simple, explicit interface for configuring logging.
Expand All @@ -273,8 +274,14 @@ def setup_logging(

# Add file handler with rotation
if log_to_file:
log_path = Path.home() / ".basic-memory" / "basic-memory.log"
# Trigger: Windows does not allow renaming an open file held by another process.
# Why: multiple basic-memory processes can share the same log directory at once.
# Outcome: use per-process log files on Windows so log rotation stays local.
log_filename = f"basic-memory-{os.getpid()}.log" if os.name == "nt" else "basic-memory.log"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid unbounded PID-specific log file growth on Windows

Using basic-memory-{pid}.log makes every process write to a brand-new filename, so Loguru's retention=5 no longer limits the overall log directory size across runs. On Windows hosts where CLI/MCP processes start frequently, ~/.basic-memory will accumulate stale per-PID logs indefinitely and can eventually consume significant disk space.

Useful? React with 👍 / 👎.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 445d009 by trimming stale per-process Windows log files during setup while keeping the current-process file path unchanged.

log_path = Path.home() / ".basic-memory" / log_filename
log_path.parent.mkdir(parents=True, exist_ok=True)
if os.name == "nt":
_cleanup_windows_log_files(log_path.parent, log_path.name)
# Keep logging synchronous (enqueue=False) to avoid background logging threads.
# Background threads are a common source of "hang on exit" issues in CLI/test runs.
logger.add(
Expand Down Expand Up @@ -308,6 +315,28 @@ def setup_logging(
logging.getLogger("watchfiles.main").setLevel(logging.WARNING)


def _cleanup_windows_log_files(log_dir: Path, current_log_name: str) -> None:
"""Trim stale per-process Windows log files so the directory stays bounded."""
stale_logs = [
path
for path in log_dir.glob("basic-memory-*.log*")
if path.is_file() and path.name != current_log_name
]

if len(stale_logs) <= WINDOWS_LOG_FILE_RETENTION - 1:
return

# Trigger: per-process log filenames avoid Windows rename contention but fragment retention.
# Why: loguru retention applies per sink, not across the whole basic-memory log directory.
# Outcome: keep only the newest stale PID logs so repeated CLI/server launches stay bounded.
stale_logs.sort(key=lambda path: path.stat().st_mtime, reverse=True)
for stale_log in stale_logs[WINDOWS_LOG_FILE_RETENTION - 1 :]:
try:
stale_log.unlink()
except OSError:
logger.debug("Failed to delete stale Windows log file: {path}", path=stale_log)


def parse_tags(tags: Union[List[str], str, None]) -> List[str]:
"""Parse tags from various input formats into a consistent list.

Expand Down
23 changes: 23 additions & 0 deletions tests/repository/test_semantic_search_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,29 @@ def test_chunk_key_includes_row_id(self):
records = self.repo._build_chunk_records(rows)
assert any("99" in r["chunk_key"] for r in records)

def test_duplicate_rows_collapse_to_unique_chunk_keys(self):
rows = [
_make_row(
row_type=SearchItemType.ENTITY.value,
title="Spec",
permalink="spec",
content_snippet="shared content",
row_id=77,
),
_make_row(
row_type=SearchItemType.ENTITY.value,
title="Spec",
permalink="spec",
content_snippet="shared content",
row_id=77,
),
]

records = self.repo._build_chunk_records(rows)

assert len(records) == 1
assert records[0]["chunk_key"] == "entity:77:0"


# --- SQLite SemanticSearchDisabledError ---

Expand Down
41 changes: 41 additions & 0 deletions tests/services/test_project_service_embedding_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import pytest
from sqlalchemy import text
from sqlalchemy.exc import OperationalError as SAOperationalError

from basic_memory.schemas.project_info import EmbeddingStatus
from basic_memory.services.project_service import ProjectService
Expand Down Expand Up @@ -142,6 +143,46 @@ async def test_embedding_status_orphaned_chunks(
assert "orphaned chunks" in (status.reindex_reason or "")


@pytest.mark.asyncio
async def test_embedding_status_handles_sqlite_vec_unavailable(
project_service: ProjectService, test_graph, test_project
):
"""Unreadable vec0 tables should degrade to unavailable status instead of crashing."""
# Trigger: Postgres test matrix executes the same unit suite.
# Why: sqlite-vec loading failures are specific to SQLite virtual tables, not Postgres joins.
# Outcome: keep the regression focused on the backend that can actually hit this path.
if _is_postgres():
pytest.skip("sqlite-vec unavailable handling is SQLite-specific.")

original_execute_query = project_service.repository.execute_query

async def _execute_query_with_vec0_failure(query, params):
query_text = str(query)
if "JOIN search_vector_embeddings" in query_text:
raise SAOperationalError(query_text, params, Exception("no such module: vec0"))
return await original_execute_query(query, params)

with patch.object(
type(project_service),
"config_manager",
new_callable=lambda: property(
lambda self: _config_manager_with(semantic_search_enabled=True)
),
):
with patch.object(
project_service.repository,
"execute_query",
side_effect=_execute_query_with_vec0_failure,
):
status = await project_service.get_embedding_status(test_project.id)

assert status.semantic_search_enabled is True
assert status.total_indexed_entities > 0
assert status.vector_tables_exist is False
assert status.reindex_recommended is True
assert "sqlite-vec is unavailable" in (status.reindex_reason or "")


@pytest.mark.asyncio
async def test_embedding_status_healthy(project_service: ProjectService, test_graph, test_project):
"""When all entities have embeddings, no reindex recommended."""
Expand Down
Loading
Loading