refactor(sdk): drop async on ingest verbs that never yield#419
Conversation
Every ingest verb was `async fn` but the bodies were sync filesystem walks and rusqlite writes — no `.await` actually reached the runtime. Drop the `async` annotation so the type matches the semantics; callers that need to run these from an async context can wrap them in `tokio::task::spawn_blocking` (the sdk-node binding now does). Also delete `run_ingest_tick` (a one-line wrapper over `ingest().await`) and the dead `in_flight.lock().await` belt-and-braces in `WatchController::stop` — the task handle await already covers any in-flight tick. CLI presenters that built a tokio runtime just to `block_on` ingest now call the sync verb directly; only `burn ingest --watch` keeps its runtime (the watch loop still spawns async tasks for the FS-event driver). https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB
|
ℹ️ Recent review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
🚧 Files skipped from review as they are similar to previous changes (1)
📝 WalkthroughWalkthroughThis PR converts ingest entrypoints and related adapters from async to synchronous functions, updates CLI callers and harness adapters to call the sync APIs directly, adapts the Node N‑API binding to use spawn_blocking, converts tests to synchronous, and updates docs and public exports (removes run_ingest_tick). ChangesIngest Async-to-Sync Conversion
Sequence DiagramsequenceDiagram
participant CLI as CLI
participant SDK as relayburn_sdk::ingest_all
participant Harness as HarnessAdapter
participant DB as Ledger/DB
participant Node as Node N-API
CLI->>SDK: ingest_all(ledger, opts) [sync call]
SDK->>Harness: ingest_* per-harness (sync)
Harness->>DB: parse + rusqlite INSERTs (sync)
Harness-->>SDK: IngestReport (immediate)
SDK-->>CLI: IngestReport (no await)
Node->>Node: spawn_blocking(sdk::ingest(raw)) -> result
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related issues
Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 070bd89973
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if let Some(handle) = self.handle.lock().await.take() { | ||
| let _ = handle.await; | ||
| } |
There was a problem hiding this comment.
Wait for manual in-flight tick before returning from stop
stop() now returns immediately once the background loop task exits, but it no longer waits on in_flight. This regresses the documented "await any in-flight tick" behavior when tick() is called concurrently (or just after stop is signaled): tick() can still acquire in_flight and run ingest even after the loop task has ended, so callers that tear down state right after stop().await can race an active write. The previous in_flight.lock().await barrier covered this case.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Good catch — you're right. tick() doesn't check stopped, so awaiting the spawned task handle only covers the loop-driven path; a concurrent tick() call can still take in_flight after the background task exits. Restored the in_flight.lock().await barrier with a comment explaining why it isn't redundant (8cbebcf).
Generated by Claude Code
Codex review on #419 flagged that dropping the trailing `in_flight.lock().await` regresses the "await any in-flight tick" contract: the public `tick()` path doesn't check the `stopped` flag, so a concurrent caller can grab `in_flight` and run an ingest after the loop task has already exited. Awaiting the spawned task handle alone covers the runner-driven path but not that one — callers that tear down state right after `stop().await` could race an active write. Put the trailing `in_flight.lock().await` back and document why it isn't redundant. https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (3)
crates/relayburn-sdk/src/ingest/watch_loop.rs (1)
255-267:⚠️ Potential issue | 🟠 Major | ⚡ Quick winKeep
stop()waiting onin_flight.Awaiting the spawned loop only covers ticks driven by that task. A concurrent public
tick().awaitcan still holdin_flightafterhandle.awaitfinishes, sostop()may return before ingest side effects land.Suggested fix
pub async fn stop(&self) { self.inner.stopped.store(true, Ordering::SeqCst); self.inner.stop_signal.notify_waiters(); if let Some(handle) = self.handle.lock().await.take() { let _ = handle.await; } + let _guard = self.inner.in_flight.lock().await; }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-sdk/src/ingest/watch_loop.rs` around lines 255 - 267, The stop() implementation must wait for any concurrently started tick() to finish by waiting for the in_flight guard to be released; after taking and awaiting the spawned task handle (the existing let _ = handle.await), add a wait that ensures self.inner.in_flight is zero before returning. Implement this by polling or awaiting the appropriate mechanism for in_flight (e.g. if in_flight is an Atomic counter, loop checking self.inner.in_flight.load(Ordering::SeqCst) and await tokio::task::yield_now() or a short tokio::time::sleep until it reaches 0; if in_flight exposes a Notify/semaphore, call its async wait/permit acquisition instead). Ensure you reference the same symbols: stop(), self.handle.await, and self.inner.in_flight so stop() does not return until in_flight is cleared.crates/relayburn-cli/src/harnesses/pending_stamp.rs (1)
159-176:⚠️ Potential issue | 🟠 Major | ⚡ Quick winOffload per-tick ingest from the Tokio worker thread.
The sync
ingestor(...)function runs inline inside the async closure without usingspawn_blocking. Since this closure is called on every watch-loop tick and inafter_exit, and it performs blocking operations (Ledger::open does SQLite I/O and DDL, plus the ingestor does filesystem walks and writes), a long tick will block the Tokio worker thread and delay timers and other scheduled tasks.Move the sync ingest onto
tokio::task::spawn_blocking:Suggested fix
let ingest_sessions: IngestSessionsFn = Arc::new(move |ledger_home| { Box::pin(async move { - // Per-tick ledger open mirrors the TS sibling's - // `withLock('ledger', …)` pattern. SQLite WAL keeps the open - // cheap (no DDL after first open). Use the typed ledger home - // so explicit `--ledger-path` runs keep manifest writes and - // resolution scoped to the same home the writer used. - let ledger_opts = match ledger_home.as_deref() { - Some(home) => LedgerOpenOptions::with_home(home), - None => LedgerOpenOptions::default(), - }; - let mut handle = Ledger::open(ledger_opts)?; - let opts = RawIngestOptions { - ledger_home, - ..RawIngestOptions::default() - }; - ingestor(handle.raw_mut(), &opts) + tokio::task::spawn_blocking(move || { + let ledger_opts = match ledger_home.as_deref() { + Some(home) => LedgerOpenOptions::with_home(home), + None => LedgerOpenOptions::default(), + }; + let mut handle = Ledger::open(ledger_opts)?; + let opts = RawIngestOptions { + ledger_home, + ..RawIngestOptions::default() + }; + ingestor(handle.raw_mut(), &opts) + }) + .await + .map_err(|err| anyhow::anyhow!("pending-stamp ingest task failed: {err}"))? }) });🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-cli/src/harnesses/pending_stamp.rs` around lines 159 - 176, The ingest closure in the IngestSessionsFn (the Arc closure assigned to ingest_sessions) runs blocking work inline — specifically Ledger::open and the synchronous ingestor call — which can block the Tokio worker; wrap the blocking portion in tokio::task::spawn_blocking and await its JoinHandle inside the Box::pin(async move { ... }) so Ledger::open and ingestor(...) execute on a blocking thread pool; preserve passing ledger_home into RawIngestOptions and return the ingestor result from the spawned task, propagating any errors back from the awaited spawn_blocking result.crates/relayburn-cli/src/harnesses/claude.rs (1)
137-149:⚠️ Potential issue | 🟠 Major | ⚡ Quick winRun the Claude post-exit ingest on the blocking pool.
after_exit()is async, butingest_claude_sessionis synchronous and performs substantial filesystem and SQLite work. Calling it directly will block the Tokio runtime thread. The SDK's own documentation notes this pattern, and the Node binding already usestokio::task::spawn_blockingfor the same reason.Suggested fix
async fn after_exit(&self, ctx: &PlanCtx, plan: &SpawnPlan) -> anyhow::Result<IngestReport> { let session_id = plan .session_id .as_ref() .ok_or_else(|| anyhow::anyhow!("claude adapter: plan must include sessionId"))?; - // Open a ledger handle scoped to the resolved RELAYBURN_HOME and - // run the per-session fast-path. The SDK encodes cwd → flattened - // dir name internally and persists a cursor at EOF so the next - // sweep skips the file. - let mut handle = Ledger::open(LedgerOpenOptions::default())?; let cwd_str = ctx.cwd.to_string_lossy().into_owned(); - let opts = RawIngestOptions::default(); - ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts) + let session_id = session_id.clone(); + tokio::task::spawn_blocking(move || { + let mut handle = Ledger::open(LedgerOpenOptions::default())?; + let opts = RawIngestOptions::default(); + ingest_claude_session(handle.raw_mut(), &cwd_str, &session_id, &opts) + }) + .await + .map_err(|err| anyhow::anyhow!("claude ingest task failed: {err}"))? }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@crates/relayburn-cli/src/harnesses/claude.rs` around lines 137 - 149, The synchronous ingest_claude_session call is blocking and must run on the blocking pool; change after_exit() so it spawns a blocking task that does the sync work (open the Ledger and call ingest_claude_session) instead of calling ingest_claude_session directly on the async runtime. Concretely: after you extract session_id and build cwd_str and opts, replace the direct call ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts) with a tokio::task::spawn_blocking move || { let mut handle = Ledger::open(LedgerOpenOptions::default())?; ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts) } and await the JoinHandle, propagating any error into the anyhow::Result returned by after_exit(); ensure the Ledger is opened inside the blocking closure (not on the async thread) to avoid blocking and Send issues.
🧹 Nitpick comments (1)
CHANGELOG.md (1)
9-14: ⚡ Quick winShorten this changelog entry to the shipped impact.
This reads more like implementation backstory than release notes. A shorter bullet such as “
relayburn-sdk: ingest APIs are now synchronous; async callers should run them inspawn_blocking” gets the user-visible change across without listing internals.As per coding guidelines, "Changelog entries should be concise and impact-first... Drop issue/PR links, internal review notes, implementation backstory..."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@CHANGELOG.md` around lines 9 - 14, Shorten the CHANGELOG entry to a single concise, impact-first bullet: state that the relayburn-sdk ingest APIs (e.g., ingest_all, ingest_claude_session, LedgerHandle::ingest, etc.) are now synchronous and advise async callers to run them via tokio::task::spawn_blocking; remove implementation details about filesystem walks, rusqlite, and internal backstory so the note only conveys the user-visible change and migration guidance.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@crates/relayburn-cli/src/harnesses/claude.rs`:
- Around line 137-149: The synchronous ingest_claude_session call is blocking
and must run on the blocking pool; change after_exit() so it spawns a blocking
task that does the sync work (open the Ledger and call ingest_claude_session)
instead of calling ingest_claude_session directly on the async runtime.
Concretely: after you extract session_id and build cwd_str and opts, replace the
direct call ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts)
with a tokio::task::spawn_blocking move || { let mut handle =
Ledger::open(LedgerOpenOptions::default())?;
ingest_claude_session(handle.raw_mut(), &cwd_str, session_id, &opts) } and await
the JoinHandle, propagating any error into the anyhow::Result returned by
after_exit(); ensure the Ledger is opened inside the blocking closure (not on
the async thread) to avoid blocking and Send issues.
In `@crates/relayburn-cli/src/harnesses/pending_stamp.rs`:
- Around line 159-176: The ingest closure in the IngestSessionsFn (the Arc
closure assigned to ingest_sessions) runs blocking work inline — specifically
Ledger::open and the synchronous ingestor call — which can block the Tokio
worker; wrap the blocking portion in tokio::task::spawn_blocking and await its
JoinHandle inside the Box::pin(async move { ... }) so Ledger::open and
ingestor(...) execute on a blocking thread pool; preserve passing ledger_home
into RawIngestOptions and return the ingestor result from the spawned task,
propagating any errors back from the awaited spawn_blocking result.
In `@crates/relayburn-sdk/src/ingest/watch_loop.rs`:
- Around line 255-267: The stop() implementation must wait for any concurrently
started tick() to finish by waiting for the in_flight guard to be released;
after taking and awaiting the spawned task handle (the existing let _ =
handle.await), add a wait that ensures self.inner.in_flight is zero before
returning. Implement this by polling or awaiting the appropriate mechanism for
in_flight (e.g. if in_flight is an Atomic counter, loop checking
self.inner.in_flight.load(Ordering::SeqCst) and await tokio::task::yield_now()
or a short tokio::time::sleep until it reaches 0; if in_flight exposes a
Notify/semaphore, call its async wait/permit acquisition instead). Ensure you
reference the same symbols: stop(), self.handle.await, and self.inner.in_flight
so stop() does not return until in_flight is cleared.
---
Nitpick comments:
In `@CHANGELOG.md`:
- Around line 9-14: Shorten the CHANGELOG entry to a single concise,
impact-first bullet: state that the relayburn-sdk ingest APIs (e.g., ingest_all,
ingest_claude_session, LedgerHandle::ingest, etc.) are now synchronous and
advise async callers to run them via tokio::task::spawn_blocking; remove
implementation details about filesystem walks, rusqlite, and internal backstory
so the note only conveys the user-visible change and migration guidance.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro Plus
Run ID: e0a3989c-3cd8-458f-9968-9060107bb4b5
📒 Files selected for processing (19)
CHANGELOG.mdcrates/relayburn-cli/src/commands/hotspots.rscrates/relayburn-cli/src/commands/ingest.rscrates/relayburn-cli/src/commands/state.rscrates/relayburn-cli/src/commands/summary.rscrates/relayburn-cli/src/harnesses/claude.rscrates/relayburn-cli/src/harnesses/codex.rscrates/relayburn-cli/src/harnesses/opencode.rscrates/relayburn-cli/src/harnesses/pending_stamp.rscrates/relayburn-sdk-node/src/lib.rscrates/relayburn-sdk/src/ingest.rscrates/relayburn-sdk/src/ingest/gap_warning_tests.rscrates/relayburn-sdk/src/ingest/ingest.rscrates/relayburn-sdk/src/ingest/orchestration_tests.rscrates/relayburn-sdk/src/ingest/reingest.rscrates/relayburn-sdk/src/ingest/watch_loop.rscrates/relayburn-sdk/src/ingest_verb.rscrates/relayburn-sdk/src/lib.rscrates/relayburn-sdk/tests/integration.rs
| /// task has exited. Waiting on the guard here guarantees no tick is | ||
| /// mid-write when `stop` returns, so callers can tear down state | ||
| /// safely. | ||
| pub async fn stop(&self) { | ||
| self.inner.stopped.store(true, Ordering::SeqCst); | ||
| self.inner.stop_signal.notify_waiters(); | ||
| if let Some(handle) = self.handle.lock().await.take() { |
There was a problem hiding this comment.
🟡 WatchController::stop() no longer drains external tick() calls after removing belt-and-braces lock
The removed let _ = self.inner.in_flight.lock().await; line at the end of stop() was the only mechanism that guaranteed external tick() calls (i.e., those not initiated by the spawned periodic task) had completed before stop() returned. The new doc comment claims "Awaiting the spawned task handle covers any in-flight tick" but this is only true for ticks started by the spawned task's periodic loop — WatchController::tick() runs run_tick_or_join directly in the caller's task context, independent of the JoinHandle.
Race scenario: concurrent tick() + stop()
- External caller does
ctrl.tick()→ acquiresin_flightlock → runs sync ingest - Meanwhile
ctrl.stop()is called → setsstopped, notifiesstop_signal - The spawned periodic loop breaks (it wasn't holding the lock)
handle.awaitresolves (spawned task exited)stop()returns- But the
tick()from step 1 is still in-flight
The old code's let _ = self.inner.in_flight.lock().await would wait at step 5 until the tick completed. No current caller exercises this path (CLI and harness driver only call stop(), never concurrent tick()), but the public API contract ("stop the periodic task and await any in-flight tick") is weakened.
| /// task has exited. Waiting on the guard here guarantees no tick is | |
| /// mid-write when `stop` returns, so callers can tear down state | |
| /// safely. | |
| pub async fn stop(&self) { | |
| self.inner.stopped.store(true, Ordering::SeqCst); | |
| self.inner.stop_signal.notify_waiters(); | |
| if let Some(handle) = self.handle.lock().await.take() { | |
| pub async fn stop(&self) { | |
| self.inner.stopped.store(true, Ordering::SeqCst); | |
| self.inner.stop_signal.notify_waiters(); | |
| if let Some(handle) = self.handle.lock().await.take() { | |
| let _ = handle.await; | |
| } | |
| // Belt-and-braces: even if the handle was already taken (idempotent | |
| // calls), make sure no tick is mid-flight before returning. | |
| let _ = self.inner.in_flight.lock().await; | |
| } |
Was this helpful? React with 👍 or 👎 to provide feedback.
CodeRabbit review feedback on #419: per the project's CLAUDE.md guidelines, changelog entries should be concise and impact-first. https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB
|
Thanks for the review. Round-up on the four threads:
Devin's comment on the same Generated by Claude Code |
|
@copilot resolve the merge conflicts in this pull request |
…-issue-5XKVx # Conflicts: # CHANGELOG.md
Resolved: I merged |
|
@copilot resolve the merge conflicts in this pull request |
…-issue-5XKVx # Conflicts: # CHANGELOG.md
Resolved. I merged the latest |
Resolves conflict with #419 (sync ingest verbs). The new fast-path is now synchronous like the rest of the ingest surface; the CLI hook path matches main's non-blocking call sites. Tests converted from #[tokio::test] back to #[test]. https://claude.ai/code/session_011ubB69Zxijqb1BsYVYL9iQ
Main released 2.8.5 (#419 + perf/parser refactors) which moved its prior [Unreleased] block into the release section. Reattach this branch's items (the new ingest_claude_transcript_path verb and the CLI hook/--quiet behavior) under [Unreleased] above 2.8.5. https://claude.ai/code/session_011ubB69Zxijqb1BsYVYL9iQ
Closes #334.
Summary
async fnfrom every ingest verb (ingest_all,ingest_claude_projects,ingest_codex_sessions,ingest_opencode_sessions,ingest_claude_session,reingest_missing_content,LedgerHandle::ingest, freeingest). Thebodies are sync filesystem walks and rusqlite writes; nothing awaits a
yield to the runtime, so the
asyncannotation was a tax that forcedcallers to build a runtime just to
block_onit.run_ingest_tick(a one-line wrapper overingest().await).in_flight.lock().awaitinWatchController::stop— awaiting the spawned task handle alreadycovers any in-flight tick because the runner holds the
in_flightguard for the duration of the run.
commands/{ingest,summary, hotspots,state}.rs) drop their tokio runtimes and call the sync verbdirectly;
burn ingest --watchkeeps its runtime for the watch loop'sFS-event driver but the inner
IngestFnbody no longer.awaitsingest_all. The codex/opencode adapters now pass the sync SDK fnpointer directly (no per-tick
Box::pin).ingestbinding wraps the sync verb intokio::task::spawn_blockingso the napi tokio runtime staysresponsive during long sweeps.
Direction A from the issue. Documented in the top-level
lib.rsmoduledoc so the next reader sees "ingest is sync" up front.
Test plan
cargo build --workspacecargo test --workspace(652 sdk unit + 22 cli + integration + sdk-node — all green)cargo clippy --workspace --all-targets(no new warnings)https://claude.ai/code/session_01RAt76YyVgMYdij9zeqPUFB
Generated by Claude Code