Skip to content

feat: Phase B + C realtime path (wait_for_messages MCP tool + UserPromptSubmit injection)#78

Open
Tominori666 wants to merge 4 commits intoraysonmeng:masterfrom
Tominori666:phase-b-wait-for-messages
Open

feat: Phase B + C realtime path (wait_for_messages MCP tool + UserPromptSubmit injection)#78
Tominori666 wants to merge 4 commits intoraysonmeng:masterfrom
Tominori666:phase-b-wait-for-messages

Conversation

@Tominori666
Copy link
Copy Markdown

@Tominori666 Tominori666 commented May 1, 2026

Summary

Two stacked commits adding both halves of the Codex↔Claude realtime path:

  1. Phase Bwait_for_messages(timeout_s) MCP tool (long-poll on the SQLite pull queue from PR feat: AGENTBRIDGE_MODE=dual durable inbound persistence + diagnostic push #77).
  2. Phase C — UserPromptSubmit hook injection of pending Codex messages into Claude's context as system-reminder additionalContext when the Claude session is between turns. Non-consuming peek; ack happens inside reply() via ackByChatId(). Idle-side counterpart to Phase B's active-listening loop.

Together: Codex push → daemon queue → (active) wait_for_messages wake or (idle) next user prompt → injected context → Claude → reply (auto-acks) or get_messages (drains).

Phase B (commit 1)

wait_for_messages registered alongside get_messages and reply. Polls queue.countUndrained() every 500ms; on hit, returns the same payload get_messages would return; on miss, returns a [wait_for_messages] timed_out after Ns ... block. Timeout clamped to [1, 60]. CLAUDE_INSTRUCTIONS updated to prefer this after reply(require_reply=true).

Phase C (commit 2)

Schema

  • Add acked_at INTEGER column to messages table with ALTER TABLE migration for pre-existing DBs.
  • New index idx_messages_unacked_undrained.

API

  • PersistentMessageQueue.ackByChatId(chatId) flips acked_at for undrained+unacked rows on a chat. Idempotent.
  • listUnackedUndrained() / countUnackedUndrained() for hook reads (non-mutating).
  • ClaudeAdapter.handleReply calls queue.ackByChatId(chat_id) after successful send when chat_id is provided.

Hooks

  • scripts/peek_codex_queue.py — read-only SQLite peek (mode=ro URI, safe under WAL). Emits hookSpecificOutput JSON, count, text, or raw JSON.
  • scripts/inject-pending-codex.sh — UserPromptSubmit hook; calls peek and emits the system-reminder block.
  • scripts/health-check.sh — SessionStart hook now appends pending count to the existing AgentBridge ready notice.
  • hooks/hooks.json — registers UserPromptSubmit alongside SessionStart.

Tests

  • Phase B: bun test src/unit-test/dual-mode.test.ts35/35 pass (4 new cases for immediate return, clean timeout, mid-wait wake, timeout clamping).
  • Phase C: bun test src/unit-test/message-queue.test.ts7/7 pass (migration safety, ackByChatId semantics, idempotency, ack-vs-drain independence, unacked listing).
  • Full non-e2e: bun test src excluding e2e-cli.test.ts108/108 pass.
  • Pre-existing e2e-cli.test.ts failures on Windows are unrelated (test harness can't locate Claude binary; "URL must be of scheme file"). Not introduced by this PR.

Stack context

Builds on (still open against master):

Phase C is logically independent of Phase B (hook reads SQLite directly, doesn't depend on the new MCP tool), but the commits live on the same branch for easier review. Happy to split into two PRs on request.

Test plan

  • bun run typecheck
  • bun test src (108/108 non-e2e)
  • bun run build:plugin
  • Phase B end-to-end: real Claude Code session — reply(require_reply=true) + wait_for_messages returned Codex pong in ~1s
  • Phase C end-to-end: requires session restart to pick up rebuilt bridge-server.js — pending verification

Tominori666 and others added 4 commits May 1, 2026 15:22
Adds a third delivery mode (`dual`) to ClaudeAdapter that combines channel-
push notifications with a durable, SQLite-backed pull queue, eliminating
the data-loss edge of pure push (notification surfaces but Claude is busy
mid-tool-call → message dropped) and the 60s polling latency of pure pull.

## Why

In pure push mode, AgentBridge does not duplicate channel notifications
into the get_messages queue, so if Claude is busy when the notification
fires, the message is silently lost. In pure pull mode, Claude only sees
new messages when it explicitly calls get_messages, so wall-clock latency
is bounded by polling cadence (typically 60s+).

`dual` mode delivers via both transports for the same logical message,
deduplicating on `(chat_id, content_hash)` for any undrained row, and
relying on get_messages to be the sole drainer.  Push success does not
mark the message drained, so a missed channel notification is naturally
recovered on the next pull.

Persistent storage (`queue.db`, WAL mode) means the inbound queue
survives Claude frontend restart, daemon restart, and process crash —
a real concern given our deployments do `/mcp reconnect` and full
agentbridge stack restarts multiple times per session during testing.

## What

- **Added `src/message-queue.ts`**: PersistentMessageQueue class. SQLite
  schema with messages(seq AUTOINCREMENT, message_id UNIQUE, chat_id,
  source, content, timestamp, marker, content_hash, pushed_at,
  push_error, drained_at, created_at). Partial unique index on
  (chat_id, content_hash) WHERE drained_at IS NULL gives undrained
  dedupe naturally. WAL + synchronous=NORMAL for crash safety.
  Backpressure via markOldestUndrainedDropped(). Audit JSONL helper
  wrapped in try/catch (audit must never block delivery).

- **Updated `src/claude-adapter.ts`**: New `DeliveryMode = "dual"`.
  pushNotification() in dual mode persists first, then pushes via
  channel; if push throws, the message stays undrained for the next
  pull. drainMessages() reads from the DB, marks drained AFTER
  successful formatting (preserves replay if anything between fails).
  Pull mode now also persists, so the existing pull path also gains
  durability as a zero-cost upgrade. Comprehensive audit events:
  message_queued, message_pushed, message_push_failed, message_dropped,
  messages_drained, reply_sent, reply_failed.

- **Updated `src/state-dir.ts`**: queueDbFile + transcriptFile getters.

- **Tests** (`src/unit-test/dual-mode.test.ts` +156 lines): 8 new cases
  including persist-first ordering with shared message id, push-throws
  preserves persisted row, dedupe by (chat_id, content_hash), audit JSONL
  is write-only and not used as replay source, undrained messages survive
  adapter restart, restart before push attempt still replays, second
  drain does not replay already-drained rows.

- **README**: AGENTBRIDGE_MODE table documents `dual`. State directory
  contents include queue.db + transcript.jsonl.

- **Bundle**: plugins/agentbridge/server/{bridge-server,daemon}.js
  rebuilt and verified in sync via verify:plugin-sync.

## Rollback

`AGENTBRIDGE_MODE=pull` → restart Claude AgentBridge frontend. No new
env var required. Pull mode still works exactly as before (now with
persistence as a free upgrade, no behavior change for callers).

## Independent of raysonmeng#75 and raysonmeng#76

This branch is forked from clean master, not stacked on
fix/sticky-attach-lock or fix/codex-orphan-cleanup. Bundle reflects only
P2 changes. No conflict expected at merge time regardless of merge order.

## Verification

- `bun run typecheck` OK
- `bun test src/unit-test/dual-mode.test.ts src/unit-test/state-dir.test.ts` → 33 pass
- Broader `bun test src/unit-test` reaches 170 pass before hitting an
  existing reconnect E2E lifecycle timeout in `e2e-reconnect.test.ts`
  that is unrelated to this change.
- `bun run build:plugin` OK
- `bun run verify:plugin-sync` OK

Co-Authored-By: Codex <noreply@openai.com>
When the AgentBridge daemon is hard-killed (Stop-Process / taskkill), its
spawned codex.exe child does not die with it on Windows. The orphan keeps
holding port 4500, so the next daemon startup spawns a new codex.exe that
exits with `os error 10048` (port in use) — surfaced to the user as the
chronic "PARTIAL state, codex exit code 66" failure that needs manual
`agentbridge kill` every time.

Existing pre-flight cleanup in CodexAdapter.start() was Unix-only
(lsof/ps/kill), so on Windows the occupied-port check silently
fell through.

Changes:
- src/codex-adapter.ts: replace Unix-only helpers with platform-aware
  getPortPids / getProcessCommandLine / killProcess / isCodexAppServerCommandLine.
  Windows uses Get-NetTCPConnection + Get-CimInstance + Stop-Process.
  Foreign port owners still throw explicitly — only matching `codex
  app-server` PIDs are killed.
- src/unit-test/codex-adapter.test.ts: add coverage for stale Codex
  cleanup and non-Codex refusal.
- plugins/agentbridge/server/daemon.js: rebuilt bundle.

Caveat: this is recovery-on-next-startup, not a Job Object kill-on-daemon-
death guarantee. Graceful shutdown still calls codex.stop() unchanged.
A Job Object follow-up would be cleaner but is much larger surgery; this
gets ~90% of the value with ~10% of the risk.

Independent of raysonmeng#75 — that PR fixes stale Claude attach ownership at the
WebSocket layer; this fixes stale Codex app-server process ownership at
the OS layer. Both can ship independently.

Verification:
- bun run typecheck OK
- bun test src/unit-test/codex-adapter.test.ts → 44 pass
- bun run build:plugin OK
- bun run verify:plugin-sync OK

Co-Authored-By: Codex <noreply@openai.com>
Adds a long-polling tool so Claude can react to Codex in real time without
forcing the user to manually nudge or relying on a scheduled wakeup chain.

Pattern inspired by enderzcx/synapse channel_wait_new: instead of trying
to push notifications into Claude Code's UI (which the IDE does not
surface as <channel> tags today), Claude itself blocks inside an MCP tool
call against our existing SQLite pull queue and returns drained messages
exactly like get_messages, or a "timed_out" sentinel after up to 60s.

- claude-adapter: register wait_for_messages tool, 500ms poll on the
  undrained queue, clamp timeout to [1, 60]s, audit timeout events, log
  start/woke/timeout transitions per instance.
- CLAUDE_INSTRUCTIONS: instruct Claude to prefer wait_for_messages over
  auto-poll after reply(require_reply=true) and to re-call on timeout
  until the user signals stop or Codex finishes.
- Tests: 4 new cases covering immediate return, clean timeout, mid-wait
  wake, and out-of-range timeout clamping (35/35 dual-mode tests pass).
- Bundles: rebuilt plugins/agentbridge/server/{bridge-server,daemon}.js
  with bun build:plugin so plugin cache picks up the new tool on next
  Claude Code session restart.

Builds on PR raysonmeng#75 (sticky lock), raysonmeng#76 (orphan cleanup), raysonmeng#77 (dual+persistence).
Adds the second half of the realtime path: when Tony's Claude session is
idle between turns, daemon-pushed Codex messages sit unacked in queue.db
and get injected into Claude's context as a system-reminder block on the
next UserPromptSubmit hook. Non-consuming peek; ack happens inside the
reply MCP tool when Claude replies on that chat_id.

Schema:
  - Add acked_at INTEGER column to messages table with ALTER TABLE
    migration for pre-existing DBs.
  - Add idx_messages_unacked_undrained for the peek query.

API:
  - PersistentMessageQueue.ackByChatId(chatId): flips acked_at for
    undrained+unacked rows on the chat. Idempotent (returns 0 on
    second call).
  - PersistentMessageQueue.listUnackedUndrained() / countUnackedUndrained()
    for the hook layer to read without mutating state.
  - ClaudeAdapter.handleReply now calls queue.ackByChatId after a
    successful reply when chat_id is provided.

Hook layer:
  - scripts/peek_codex_queue.py: read-only SQLite peek (mode=ro URI,
    safe under WAL). Emits hookSpecificOutput JSON, count, text, or
    raw JSON.
  - scripts/inject-pending-codex.sh: UserPromptSubmit hook that calls
    peek and emits the system-reminder block.
  - scripts/health-check.sh: SessionStart hook now appends pending
    count to the existing AgentBridge ready/not-ready notice.
  - hooks/hooks.json: register UserPromptSubmit alongside SessionStart.

Tests: 7 new tests in src/unit-test/message-queue.test.ts cover migration,
ack semantics, idempotency, ack-vs-drain independence, and unacked
list shape. All 108 non-e2e tests pass; pre-existing e2e-cli.test.ts
failures on Windows are unrelated (test harness can't locate Claude
binary).

Phase B (wait_for_messages MCP tool) handles the active-listening case;
Phase C handles the idle case. Together they cover the full realtime
loop: Codex push -> daemon queue -> (active) wait_for_messages wake or
(idle) next UserPromptSubmit -> Claude -> reply tool -> ack.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
@Tominori666 Tominori666 changed the title feat: wait_for_messages MCP tool for real-time listening loop feat: Phase B + C realtime path (wait_for_messages MCP tool + UserPromptSubmit injection) May 1, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant