feat: Phase B + C realtime path (wait_for_messages MCP tool + UserPromptSubmit injection)#78
Open
Tominori666 wants to merge 4 commits intoraysonmeng:masterfrom
Open
Conversation
Adds a third delivery mode (`dual`) to ClaudeAdapter that combines channel-
push notifications with a durable, SQLite-backed pull queue, eliminating
the data-loss edge of pure push (notification surfaces but Claude is busy
mid-tool-call → message dropped) and the 60s polling latency of pure pull.
## Why
In pure push mode, AgentBridge does not duplicate channel notifications
into the get_messages queue, so if Claude is busy when the notification
fires, the message is silently lost. In pure pull mode, Claude only sees
new messages when it explicitly calls get_messages, so wall-clock latency
is bounded by polling cadence (typically 60s+).
`dual` mode delivers via both transports for the same logical message,
deduplicating on `(chat_id, content_hash)` for any undrained row, and
relying on get_messages to be the sole drainer. Push success does not
mark the message drained, so a missed channel notification is naturally
recovered on the next pull.
Persistent storage (`queue.db`, WAL mode) means the inbound queue
survives Claude frontend restart, daemon restart, and process crash —
a real concern given our deployments do `/mcp reconnect` and full
agentbridge stack restarts multiple times per session during testing.
## What
- **Added `src/message-queue.ts`**: PersistentMessageQueue class. SQLite
schema with messages(seq AUTOINCREMENT, message_id UNIQUE, chat_id,
source, content, timestamp, marker, content_hash, pushed_at,
push_error, drained_at, created_at). Partial unique index on
(chat_id, content_hash) WHERE drained_at IS NULL gives undrained
dedupe naturally. WAL + synchronous=NORMAL for crash safety.
Backpressure via markOldestUndrainedDropped(). Audit JSONL helper
wrapped in try/catch (audit must never block delivery).
- **Updated `src/claude-adapter.ts`**: New `DeliveryMode = "dual"`.
pushNotification() in dual mode persists first, then pushes via
channel; if push throws, the message stays undrained for the next
pull. drainMessages() reads from the DB, marks drained AFTER
successful formatting (preserves replay if anything between fails).
Pull mode now also persists, so the existing pull path also gains
durability as a zero-cost upgrade. Comprehensive audit events:
message_queued, message_pushed, message_push_failed, message_dropped,
messages_drained, reply_sent, reply_failed.
- **Updated `src/state-dir.ts`**: queueDbFile + transcriptFile getters.
- **Tests** (`src/unit-test/dual-mode.test.ts` +156 lines): 8 new cases
including persist-first ordering with shared message id, push-throws
preserves persisted row, dedupe by (chat_id, content_hash), audit JSONL
is write-only and not used as replay source, undrained messages survive
adapter restart, restart before push attempt still replays, second
drain does not replay already-drained rows.
- **README**: AGENTBRIDGE_MODE table documents `dual`. State directory
contents include queue.db + transcript.jsonl.
- **Bundle**: plugins/agentbridge/server/{bridge-server,daemon}.js
rebuilt and verified in sync via verify:plugin-sync.
## Rollback
`AGENTBRIDGE_MODE=pull` → restart Claude AgentBridge frontend. No new
env var required. Pull mode still works exactly as before (now with
persistence as a free upgrade, no behavior change for callers).
## Independent of raysonmeng#75 and raysonmeng#76
This branch is forked from clean master, not stacked on
fix/sticky-attach-lock or fix/codex-orphan-cleanup. Bundle reflects only
P2 changes. No conflict expected at merge time regardless of merge order.
## Verification
- `bun run typecheck` OK
- `bun test src/unit-test/dual-mode.test.ts src/unit-test/state-dir.test.ts` → 33 pass
- Broader `bun test src/unit-test` reaches 170 pass before hitting an
existing reconnect E2E lifecycle timeout in `e2e-reconnect.test.ts`
that is unrelated to this change.
- `bun run build:plugin` OK
- `bun run verify:plugin-sync` OK
Co-Authored-By: Codex <noreply@openai.com>
When the AgentBridge daemon is hard-killed (Stop-Process / taskkill), its spawned codex.exe child does not die with it on Windows. The orphan keeps holding port 4500, so the next daemon startup spawns a new codex.exe that exits with `os error 10048` (port in use) — surfaced to the user as the chronic "PARTIAL state, codex exit code 66" failure that needs manual `agentbridge kill` every time. Existing pre-flight cleanup in CodexAdapter.start() was Unix-only (lsof/ps/kill), so on Windows the occupied-port check silently fell through. Changes: - src/codex-adapter.ts: replace Unix-only helpers with platform-aware getPortPids / getProcessCommandLine / killProcess / isCodexAppServerCommandLine. Windows uses Get-NetTCPConnection + Get-CimInstance + Stop-Process. Foreign port owners still throw explicitly — only matching `codex app-server` PIDs are killed. - src/unit-test/codex-adapter.test.ts: add coverage for stale Codex cleanup and non-Codex refusal. - plugins/agentbridge/server/daemon.js: rebuilt bundle. Caveat: this is recovery-on-next-startup, not a Job Object kill-on-daemon- death guarantee. Graceful shutdown still calls codex.stop() unchanged. A Job Object follow-up would be cleaner but is much larger surgery; this gets ~90% of the value with ~10% of the risk. Independent of raysonmeng#75 — that PR fixes stale Claude attach ownership at the WebSocket layer; this fixes stale Codex app-server process ownership at the OS layer. Both can ship independently. Verification: - bun run typecheck OK - bun test src/unit-test/codex-adapter.test.ts → 44 pass - bun run build:plugin OK - bun run verify:plugin-sync OK Co-Authored-By: Codex <noreply@openai.com>
Adds a long-polling tool so Claude can react to Codex in real time without
forcing the user to manually nudge or relying on a scheduled wakeup chain.
Pattern inspired by enderzcx/synapse channel_wait_new: instead of trying
to push notifications into Claude Code's UI (which the IDE does not
surface as <channel> tags today), Claude itself blocks inside an MCP tool
call against our existing SQLite pull queue and returns drained messages
exactly like get_messages, or a "timed_out" sentinel after up to 60s.
- claude-adapter: register wait_for_messages tool, 500ms poll on the
undrained queue, clamp timeout to [1, 60]s, audit timeout events, log
start/woke/timeout transitions per instance.
- CLAUDE_INSTRUCTIONS: instruct Claude to prefer wait_for_messages over
auto-poll after reply(require_reply=true) and to re-call on timeout
until the user signals stop or Codex finishes.
- Tests: 4 new cases covering immediate return, clean timeout, mid-wait
wake, and out-of-range timeout clamping (35/35 dual-mode tests pass).
- Bundles: rebuilt plugins/agentbridge/server/{bridge-server,daemon}.js
with bun build:plugin so plugin cache picks up the new tool on next
Claude Code session restart.
Builds on PR raysonmeng#75 (sticky lock), raysonmeng#76 (orphan cleanup), raysonmeng#77 (dual+persistence).
Adds the second half of the realtime path: when Tony's Claude session is
idle between turns, daemon-pushed Codex messages sit unacked in queue.db
and get injected into Claude's context as a system-reminder block on the
next UserPromptSubmit hook. Non-consuming peek; ack happens inside the
reply MCP tool when Claude replies on that chat_id.
Schema:
- Add acked_at INTEGER column to messages table with ALTER TABLE
migration for pre-existing DBs.
- Add idx_messages_unacked_undrained for the peek query.
API:
- PersistentMessageQueue.ackByChatId(chatId): flips acked_at for
undrained+unacked rows on the chat. Idempotent (returns 0 on
second call).
- PersistentMessageQueue.listUnackedUndrained() / countUnackedUndrained()
for the hook layer to read without mutating state.
- ClaudeAdapter.handleReply now calls queue.ackByChatId after a
successful reply when chat_id is provided.
Hook layer:
- scripts/peek_codex_queue.py: read-only SQLite peek (mode=ro URI,
safe under WAL). Emits hookSpecificOutput JSON, count, text, or
raw JSON.
- scripts/inject-pending-codex.sh: UserPromptSubmit hook that calls
peek and emits the system-reminder block.
- scripts/health-check.sh: SessionStart hook now appends pending
count to the existing AgentBridge ready/not-ready notice.
- hooks/hooks.json: register UserPromptSubmit alongside SessionStart.
Tests: 7 new tests in src/unit-test/message-queue.test.ts cover migration,
ack semantics, idempotency, ack-vs-drain independence, and unacked
list shape. All 108 non-e2e tests pass; pre-existing e2e-cli.test.ts
failures on Windows are unrelated (test harness can't locate Claude
binary).
Phase B (wait_for_messages MCP tool) handles the active-listening case;
Phase C handles the idle case. Together they cover the full realtime
loop: Codex push -> daemon queue -> (active) wait_for_messages wake or
(idle) next UserPromptSubmit -> Claude -> reply tool -> ack.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Two stacked commits adding both halves of the Codex↔Claude realtime path:
wait_for_messages(timeout_s)MCP tool (long-poll on the SQLite pull queue from PR feat: AGENTBRIDGE_MODE=dual durable inbound persistence + diagnostic push #77).system-reminder additionalContextwhen the Claude session is between turns. Non-consuming peek; ack happens insidereply()viaackByChatId(). Idle-side counterpart to Phase B's active-listening loop.Together: Codex push → daemon queue → (active)
wait_for_messageswake or (idle) next user prompt → injected context → Claude →reply(auto-acks) orget_messages(drains).Phase B (commit 1)
wait_for_messagesregistered alongsideget_messagesandreply. Pollsqueue.countUndrained()every 500ms; on hit, returns the same payloadget_messageswould return; on miss, returns a[wait_for_messages] timed_out after Ns ...block. Timeout clamped to[1, 60]. CLAUDE_INSTRUCTIONS updated to prefer this afterreply(require_reply=true).Phase C (commit 2)
Schema
acked_at INTEGERcolumn tomessagestable withALTER TABLEmigration for pre-existing DBs.idx_messages_unacked_undrained.API
PersistentMessageQueue.ackByChatId(chatId)flipsacked_atfor undrained+unacked rows on a chat. Idempotent.listUnackedUndrained()/countUnackedUndrained()for hook reads (non-mutating).ClaudeAdapter.handleReplycallsqueue.ackByChatId(chat_id)after successful send when chat_id is provided.Hooks
scripts/peek_codex_queue.py— read-only SQLite peek (mode=roURI, safe under WAL). EmitshookSpecificOutputJSON, count, text, or raw JSON.scripts/inject-pending-codex.sh— UserPromptSubmit hook; calls peek and emits thesystem-reminderblock.scripts/health-check.sh— SessionStart hook now appends pending count to the existing AgentBridge ready notice.hooks/hooks.json— registers UserPromptSubmit alongside SessionStart.Tests
bun test src/unit-test/dual-mode.test.ts— 35/35 pass (4 new cases for immediate return, clean timeout, mid-wait wake, timeout clamping).bun test src/unit-test/message-queue.test.ts— 7/7 pass (migration safety, ackByChatId semantics, idempotency, ack-vs-drain independence, unacked listing).bun test srcexcludinge2e-cli.test.ts— 108/108 pass.e2e-cli.test.tsfailures on Windows are unrelated (test harness can't locate Claude binary; "URL must be of scheme file"). Not introduced by this PR.Stack context
Builds on (still open against
master):AGENTBRIDGE_MODE=dualdurable inbound persistence + diagnostic pushPhase C is logically independent of Phase B (hook reads SQLite directly, doesn't depend on the new MCP tool), but the commits live on the same branch for easier review. Happy to split into two PRs on request.
Test plan
bun run typecheckbun test src(108/108 non-e2e)bun run build:pluginreply(require_reply=true)+wait_for_messagesreturned Codex pong in ~1s