From 3972c4e02ae012d50f5adff3d7236f20f7fc79ae Mon Sep 17 00:00:00 2001 From: liliwilson Date: Tue, 19 May 2026 00:07:17 -0700 Subject: [PATCH 1/5] Only refresh tasks on RTC invalidation if a view is open. --- app/src/ai/agent_conversations_model.rs | 123 +++++++++---- app/src/ai/agent_conversations_model_tests.rs | 1 + .../server/cloud_objects/update_manager.rs | 37 ++-- specs/rtc-task-invalidation-phase1/TECH.md | 169 ++++++++++++++++++ 4 files changed, 283 insertions(+), 47 deletions(-) create mode 100644 specs/rtc-task-invalidation-phase1/TECH.md diff --git a/app/src/ai/agent_conversations_model.rs b/app/src/ai/agent_conversations_model.rs index 47e546cdff..d0677a3dd4 100644 --- a/app/src/ai/agent_conversations_model.rs +++ b/app/src/ai/agent_conversations_model.rs @@ -528,6 +528,9 @@ pub struct AgentConversationsModel { /// and are absent from this map. task_fetch_state: HashMap, rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState, + /// Earliest RTC timestamp received while no consumer view was open. + /// On next `register_view_open`, triggers a single `fetch_tasks_updated_after`. + dirty_since: Option>, } pub enum AgentConversationsModelEvent { @@ -575,6 +578,7 @@ impl AgentConversationsModel { has_finished_initial_load: true, task_fetch_state: HashMap::new(), rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(), + dirty_since: None, }; } @@ -613,6 +617,7 @@ impl AgentConversationsModel { has_finished_initial_load: false, task_fetch_state: HashMap::new(), rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(), + dirty_since: None, }; // Only sync local conversations if we're not in CLI mode. Server-side data @@ -678,23 +683,50 @@ impl AgentConversationsModel { event: &UpdateManagerEvent, ctx: &mut ModelContext, ) { - if let UpdateManagerEvent::AmbientTaskUpdated { timestamp } = event { - match std::mem::take(&mut self.rtc_task_refresh_throttle_state) { - RtcTaskRefreshThrottleState::Idle => { - self.fetch_tasks_updated_after(*timestamp, ctx); - self.start_rtc_task_refresh_throttle_timer(ctx); - } - RtcTaskRefreshThrottleState::CoolingDown { - mut pending_timestamp, + let UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp } = event else { + return; + }; + + // (a) If this task has an open tab (any window), force a re-fetch. + let has_open_tab = ActiveAgentViewsModel::as_ref(ctx) + .get_terminal_view_id_for_ambient_task(*task_id) + .is_some(); + if has_open_tab { + self.get_or_async_fetch_task_data_internal(task_id, true, ctx); + } + + // (b) If management view or conversation list is open, throttled list-fetch. + let has_list_consumers = self + .active_data_consumers_per_window + .values() + .any(|views| !views.is_empty()); + if has_list_consumers { + self.handle_rtc_for_list_views(*timestamp, ctx); + } else { + // (c) Nothing open: record earliest timestamp for flush on next view open. + record_earliest_rtc_task_refresh_timestamp(&mut self.dirty_since, *timestamp); + } + } + + fn handle_rtc_for_list_views( + &mut self, + timestamp: DateTime, + ctx: &mut ModelContext, + ) { + match std::mem::take(&mut self.rtc_task_refresh_throttle_state) { + RtcTaskRefreshThrottleState::Idle => { + self.fetch_tasks_updated_after(timestamp, ctx); + self.start_rtc_task_refresh_throttle_timer(ctx); + } + RtcTaskRefreshThrottleState::CoolingDown { + mut pending_timestamp, + timer_abort_handle, + } => { + record_earliest_rtc_task_refresh_timestamp(&mut pending_timestamp, timestamp); + self.rtc_task_refresh_throttle_state = RtcTaskRefreshThrottleState::CoolingDown { + pending_timestamp, timer_abort_handle, - } => { - record_earliest_rtc_task_refresh_timestamp(&mut pending_timestamp, *timestamp); - self.rtc_task_refresh_throttle_state = - RtcTaskRefreshThrottleState::CoolingDown { - pending_timestamp, - timer_abort_handle, - }; - } + }; } } } @@ -945,6 +977,11 @@ impl AgentConversationsModel { .or_default() .insert(view_id); self.update_polling_state(ctx); + + // Flush dirty tasks accumulated while no view was open. + if let Some(dirty_since) = self.dirty_since.take() { + self.fetch_tasks_updated_after(dirty_since, ctx); + } } /// Called when a view that consumes this model's data becomes hidden. @@ -1524,29 +1561,44 @@ impl AgentConversationsModel { task_id: &AmbientAgentTaskId, ctx: &mut ModelContext, ) -> Option { - // If we already have it, return it - if let Some(task) = self.tasks.get(task_id) { - return Some(task.clone()); + self.get_or_async_fetch_task_data_internal(task_id, false, ctx) + } + + // If `force_refresh` is true, this will invalidate the cache entry for the stored + // task's data and refetch the data from the server. We use this for handling RTC + // invalidations. + fn get_or_async_fetch_task_data_internal( + &mut self, + task_id: &AmbientAgentTaskId, + force_refresh: bool, + ctx: &mut ModelContext, + ) -> Option { + if !force_refresh { + if let Some(task) = self.tasks.get(task_id) { + return Some(task.clone()); + } } - // Consult the per-task fetch state. The three variants are mutually exclusive: at most - // one applies to a given id. - match self.task_fetch_state.get(task_id) { - Some(TaskFetchState::InFlight) => return None, - Some(TaskFetchState::PermanentlyFailed { at, .. }) => { - if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN { - return None; + // Consult the per-task fetch state unless force-refreshing. + if !force_refresh { + match self.task_fetch_state.get(task_id) { + Some(TaskFetchState::InFlight) => return None, + Some(TaskFetchState::PermanentlyFailed { at, .. }) => { + if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN { + return None; + } + self.task_fetch_state.remove(task_id); } - // Cooldown has elapsed; clear the entry and fall through to fetch again. - self.task_fetch_state.remove(task_id); - } - Some(TaskFetchState::TransientlyFailed { at, .. }) => { - if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN { - return None; + Some(TaskFetchState::TransientlyFailed { at, .. }) => { + if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN { + return None; + } + self.task_fetch_state.remove(task_id); } - self.task_fetch_state.remove(task_id); + None => {} } - None => {} + } else { + self.task_fetch_state.remove(task_id); } // Opportunistically purge other expired entries so the map doesn't grow unbounded. @@ -1603,7 +1655,8 @@ impl AgentConversationsModel { }, ); - None + // Return the stale cached copy if available (force_refresh keeps it in the map). + self.tasks.get(task_id).cloned() } /// Returns all (name, uid) pairs for creators of tasks in the model. diff --git a/app/src/ai/agent_conversations_model_tests.rs b/app/src/ai/agent_conversations_model_tests.rs index 5174264eac..dc4c941c27 100644 --- a/app/src/ai/agent_conversations_model_tests.rs +++ b/app/src/ai/agent_conversations_model_tests.rs @@ -581,6 +581,7 @@ fn create_test_model() -> AgentConversationsModel { has_finished_initial_load: false, task_fetch_state: Default::default(), rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(), + dirty_since: None, } } diff --git a/app/src/server/cloud_objects/update_manager.rs b/app/src/server/cloud_objects/update_manager.rs index 87d4d7830a..033a35a3b4 100644 --- a/app/src/server/cloud_objects/update_manager.rs +++ b/app/src/server/cloud_objects/update_manager.rs @@ -1,15 +1,14 @@ -use std::collections::{HashMap, HashSet}; -use std::future::Future; -use std::sync::mpsc::SyncSender; -use std::sync::Arc; -use std::time::Duration; - use chrono::{DateTime, Utc}; use futures::channel::oneshot::{self, Receiver}; use futures::stream::AbortHandle; use itertools::Itertools; use lazy_static::lazy_static; use regex::Regex; +use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::sync::mpsc::SyncSender; +use std::sync::Arc; +use std::time::Duration; use warp_core::features::FeatureFlag; use warp_graphql::mcp_gallery_template::MCPGalleryTemplate; use warp_graphql::object_permissions::AccessLevel; @@ -26,6 +25,7 @@ use crate::ai::agent::conversation::AIConversationId; use crate::ai::ambient_agents::scheduled::{ CloudScheduledAmbientAgentModel, ScheduledAmbientAgent, }; +use crate::ai::ambient_agents::AmbientAgentTaskId; use crate::ai::blocklist::BlocklistAIHistoryModel; use crate::ai::cloud_environments::{AmbientAgentEnvironment, CloudAmbientAgentEnvironmentModel}; use crate::ai::execution_profiles::profiles::AIExecutionProfilesModel; @@ -127,10 +127,19 @@ pub struct ObjectOperationResult { #[derive(Debug)] pub enum UpdateManagerEvent { - ObjectOperationComplete { result: ObjectOperationResult }, - CloudPreferencesUpdated { updated: Vec }, - MCPGalleryUpdated { templates: Vec }, - AmbientTaskUpdated { timestamp: DateTime }, + ObjectOperationComplete { + result: ObjectOperationResult, + }, + CloudPreferencesUpdated { + updated: Vec, + }, + MCPGalleryUpdated { + templates: Vec, + }, + AmbientTaskUpdated { + task_id: AmbientAgentTaskId, + timestamp: DateTime, + }, } /// An enum for choosing the behavior of the fetch_single_cloud_object function. @@ -1110,11 +1119,15 @@ impl UpdateManager { fn handle_ambient_task_changed( &mut self, - _task_id: String, + task_id: String, timestamp: DateTime, ctx: &mut ModelContext, ) { - ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { timestamp }); + let Ok(task_id) = task_id.parse::() else { + log::warn!("Ignoring AmbientTaskUpdated with unparseable task_id: {task_id}"); + return; + }; + ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp }); } /// Fetches environment "last used" timestamps from the server and merges them diff --git a/specs/rtc-task-invalidation-phase1/TECH.md b/specs/rtc-task-invalidation-phase1/TECH.md new file mode 100644 index 0000000000..26347b98b2 --- /dev/null +++ b/specs/rtc-task-invalidation-phase1/TECH.md @@ -0,0 +1,169 @@ +# Tech spec: RTC task invalidation — Phase 1 (client-only) + +## Context + +RTC invalidations for cloud agent tasks cause excessive `GET /api/v1/agent/runs` requests. During a bug bash with multiple concurrent agents on a team, this triggered 429 rate limiting that blocked spawning new agents. See [investigation report](/Users/liliwilson/Downloads/05_18_2025%20429s%20when%20starting%20cloud%20agents.md) for full context. + +### Current flow + +The server sends `AmbientTaskUpdated { TaskId, Timestamp }` over the websocket on every task state transition, session link update, conversation ID update, and task creation. The client receives this in the `Listener` → `UpdateManager` → `AgentConversationsModel` chain: + +1. `listener.rs:113-116` — `ObjectUpdateMessage::AmbientTaskUpdated { task_id, timestamp }` arrives with both fields. +2. `update_manager.rs:1119-1125` — `handle_ambient_task_changed` **discards** `task_id` (param named `_task_id`) and emits `UpdateManagerEvent::AmbientTaskUpdated { timestamp }` with only the timestamp. +3. `agent_conversations_model.rs:673-696` — `handle_update_manager_event` throttles at 5s, then calls `fetch_tasks_updated_after(timestamp)` which hits `GET /api/v1/agent/runs?limit=100&updated_after={ts}` — a list fetch of all recently-updated tasks. + +### Three consumer surfaces + +- **Conversation details panel** (`terminal/view/ambient_agent/view_impl.rs:938-977`): pane-level sidebar showing one task. Uses `get_or_async_fetch_task_data(task_id)` which hits `GET /agent/runs/{task_id}` with per-task dedup. Not connected to RTC directly — free-rides on the list-fetch populating `self.tasks`. Today this works because the list-fetch fires unconditionally on every RTC event. But once we gate the list-fetch on whether views are open (change 3), the details panel loses its data source and needs its own RTC path. +- **Agent management view** (`workspace/view.rs:8023-8048`): full-page dashboard. Shows all tasks (personal + team). Registers with `register_view_open`/`register_view_closed`. +- **Conversation list view** (`workspace/view/left_panel.rs:1030-1047`): left panel sidebar. Shows **personal tasks only** (`OwnerFilter::PersonalOnly` at `conversation_list/view_model.rs:73`). Also registers with `register_view_open`/`register_view_closed`. + +### Problems + +1. `task_id` discarded → forces broad list-fetch on every RTC event +2. Details panel has no direct RTC path → relying indirectly on `AgentConversationsModel`. +3. Every RTC event triggers a list-fetch even if no list view is open +4. No recovery if websocket misses a message (polling fully disabled when RTC is on, `agent_conversations_model.rs:981-983`) + +### Relevant files + +- `app/src/server/cloud_objects/listener.rs:113-116` — websocket message type with `task_id` +- `app/src/server/cloud_objects/update_manager.rs:1119-1126` — discards `task_id` +- `app/src/server/cloud_objects/update_manager.rs:137-142` — `UpdateManagerEvent` enum +- `app/src/ai/agent_conversations_model.rs:56` — `RTC_TASK_REFRESH_THROTTLE` (5s) +- `app/src/ai/agent_conversations_model.rs:673-696` — `handle_update_manager_event` +- `app/src/ai/agent_conversations_model.rs:735-768` — `fetch_tasks_updated_after` +- `app/src/ai/agent_conversations_model.rs:932-961` — `register_view_open`/`register_view_closed` +- `app/src/ai/agent_conversations_model.rs:975-1001` — `should_be_polling` +- `app/src/ai/agent_conversations_model.rs:1519-1601` — `get_or_async_fetch_task_data` +- `app/src/ai/active_agent_views_model.rs:83-93` — tracks focused conversations and ambient sessions +- `app/src/terminal/view/ambient_agent/view_impl.rs:938-977` — details panel data fetch +- `app/src/workspace/view/conversation_list/view_model.rs:68-91` — personal-only filter + +## Proposed changes + +### 1. Pass `task_id` through the event chain + +In `update_manager.rs`, add `task_id` to the event: + +```rust path=null start=null +// update_manager.rs:137-142 +enum UpdateManagerEvent { + // ... + AmbientTaskUpdated { task_id: AmbientAgentTaskId, timestamp: DateTime }, +} +``` + +Rename `_task_id` → `task_id` in `handle_ambient_task_changed` and include it in the emitted event. This requires importing `AmbientAgentTaskId` in `update_manager.rs`. + +### 2. Per-surface RTC dispatch in `AgentConversationsModel` + +Replace the current `handle_update_manager_event` (which unconditionally list-fetches) with a dispatch that routes based on what's open. + +New `handle_update_manager_event` logic: + +```rust path=null start=null +fn handle_update_manager_event(&mut self, event: &UpdateManagerEvent, ctx: &mut ModelContext) { + let UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp } = event else { + return; + }; + + // (a) Details panel: if any window has this task focused, do a targeted single-task fetch. + // This uses get_or_async_fetch_task_data which already deduplicates in-flight requests. + if self.is_task_actively_viewed(*task_id, ctx) { + self.refresh_single_task(*task_id, ctx); + } + + // (b) List views: if management view or conversation list is open, do a throttled list-fetch. + if self.has_any_active_data_consumers() { + self.handle_rtc_for_list_views(*timestamp, ctx); + } else { + // (c) Nothing open: mark dirty for later. + self.dirty_task_ids.insert(*task_id); + } +} +``` + +#### 2a. `is_task_actively_viewed` + +Check `ActiveAgentViewsModel` for whether the `task_id` has an open ambient session tab: + +```rust path=null start=null +fn is_task_actively_viewed(&self, task_id: AmbientAgentTaskId, ctx: &AppContext) -> bool { + ActiveAgentViewsModel::as_ref(ctx) + .get_terminal_view_id_for_ambient_task(task_id) + .is_some() +} +``` + +This covers any window where the task is open in a tab (not just the focused window). The details panel refresh is cheap — `get_or_async_fetch_task_data` won't spawn a second HTTP request if one is already in flight for the same task_id (it checks `TaskFetchState::InFlight` at `agent_conversations_model.rs:1532` and returns `None` immediately). + +#### 2b. `refresh_single_task` + +Call `get_or_async_fetch_task_data` which already exists, has per-task dedup (`TaskFetchState::InFlight`), backoff for failures, and emits `TasksUpdated` on completion. No new code needed — just call it. + +#### 2c. `handle_rtc_for_list_views` + +Extract the existing throttle logic from today's `handle_update_manager_event` into this method. Identical behavior to today — throttled list-fetch. + +We keep the list-fetch (rather than batching single-task fetches) because: (1) the management view shows team tasks, so it needs to discover new tasks created by teammates — single-task fetches can only refresh known task_ids; (2) during bursts (20 tasks changing in a 5s window), 1 list-fetch is cheaper than 20 individual requests; (3) the big win is gating — not doing the list-fetch at all when the view isn't open. + +#### 2d. `has_any_active_data_consumers` + +```rust path=null start=null +fn has_any_active_data_consumers(&self) -> bool { + self.active_data_consumers_per_window + .values() + .any(|views| !views.is_empty()) +} +``` + +### 3. Dirty-on-open flush + +Add a `dirty_task_ids: HashSet` field to `AgentConversationsModel`. + +In `register_view_open`, after the existing logic, flush dirty tasks: + +```rust path=null start=null +if !self.dirty_task_ids.is_empty() { + let dirty_ids: Vec<_> = self.dirty_task_ids.drain().collect(); + for task_id in dirty_ids { + self.get_or_async_fetch_task_data(&task_id, ctx); + } +} +``` + +This uses single-task fetches (not a list-fetch) so it's bounded by the number of tasks that changed while the view was closed. + +Cap the dirty set at ~200 entries. If it overflows, clear it and do a full list-fetch on next view open instead. + +### Summary of request reduction + +Before: every RTC event → 1 list-fetch (`GET /agent/runs?updated_after=...`), throttled to 1 per 5s. + +After: +- Details panel open, nothing else → 1 single-task fetch per event (deduped by `TaskFetchState`) +- Management/convo list open → same list-fetch as today (throttled) +- Nothing open → 0 requests, dirty marks only + +No safety-net periodic poll for now. The websocket `Listener` already triggers an object refresh on reconnection (`listener.rs:232`), which covers the main failure mode (disconnect). If we see data gaps from silently dropped messages, we can add a slow poll later. + +For a team of 10 running 5 agents with 4-5 state changes each: before = ~250 list-fetches across team in 5 min. After = ~0 list-fetches for users not looking at views, plus ~1 single-task fetch per user per agent they have open. + +## Testing and validation + +**Unit tests** in `agent_conversations_model_tests.rs`: +- RTC event with `task_id` when details panel has that task open → `get_or_async_fetch_task_data` called +- RTC event when no views open → task_id added to `dirty_task_ids`, no fetch +- `register_view_open` with dirty tasks → each dirty task fetched via single-task endpoint +- Dirty set overflow (>200) → cleared, list-fetch on next open + +**Manual verification**: +- Add `log::info!("[lili] ...")` in `handle_update_manager_event` to count fetches before/after +- Run multiple cloud agents on a team, verify no list-fetches when management view is closed +- Open details panel for an agent, verify state changes appear within ~3s +- Close all views, let agents run, reopen management view → verify dirty tasks load + +## Parallelization + +Not beneficial — all changes are in `agent_conversations_model.rs` and `update_manager.rs` with tight coupling between them. Single-agent serial implementation is the right approach. From 8bef82fa6eb29ea044823aea4e2cdd4434aa54a1 Mon Sep 17 00:00:00 2001 From: liliwilson Date: Tue, 19 May 2026 11:57:12 -0700 Subject: [PATCH 2/5] Updates --- app/src/ai/agent_conversations_model.rs | 2 +- specs/rtc-task-invalidation-phase1/TECH.md | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/app/src/ai/agent_conversations_model.rs b/app/src/ai/agent_conversations_model.rs index d0677a3dd4..b9c015328b 100644 --- a/app/src/ai/agent_conversations_model.rs +++ b/app/src/ai/agent_conversations_model.rs @@ -695,12 +695,12 @@ impl AgentConversationsModel { self.get_or_async_fetch_task_data_internal(task_id, true, ctx); } - // (b) If management view or conversation list is open, throttled list-fetch. let has_list_consumers = self .active_data_consumers_per_window .values() .any(|views| !views.is_empty()); if has_list_consumers { + // (b) If management view or conversation list is open, throttled list-fetch. self.handle_rtc_for_list_views(*timestamp, ctx); } else { // (c) Nothing open: record earliest timestamp for flush on next view open. diff --git a/specs/rtc-task-invalidation-phase1/TECH.md b/specs/rtc-task-invalidation-phase1/TECH.md index 26347b98b2..11a3654c50 100644 --- a/specs/rtc-task-invalidation-phase1/TECH.md +++ b/specs/rtc-task-invalidation-phase1/TECH.md @@ -25,6 +25,17 @@ The server sends `AmbientTaskUpdated { TaskId, Timestamp }` over the websocket o 3. Every RTC event triggers a list-fetch even if no list view is open 4. No recovery if websocket misses a message (polling fully disabled when RTC is on, `agent_conversations_model.rs:981-983`) +### Out of scope: spawn.rs session polling + +`ambient_agents/spawn.rs` has a separate polling loop (`poll_run_until_joinable_session`, `spawn.rs:165-308`) that polls `GET /agent/runs/{task_id}` every 3s (`TASK_STATUS_POLL_INTERVAL`, `spawn.rs:23`) to detect when a session becomes joinable. **Not affected by these changes.** + +The tab IS registered in `ActiveAgentViewsModel` on `TaskSpawned` (`model.rs:1261-1262`), so the RTC handler (change 2a) will see `has_open_tab = true` and trigger redundant re-fetches during spawn. This is a minor inefficiency (~4-5 extra single-task requests per spawn) deferred for now — the big win is eliminating list-fetches. + +RTC cannot replace spawn.rs because: +- spawn.rs drives the session state machine (`WaitingForSession` → `AgentRunning`) by emitting `AmbientAgentEvent::SessionStarted` (`spawn.rs:292-295`), which triggers the shared session join (`model.rs:1311-1346`). RTC only refreshes cached task data. +- spawn.rs handles timeouts, error/terminal states, followup stale-state skipping, and cancellation. +- spawn.rs extracts `SessionJoinInfo::from_task` (`spawn.rs:278`) each poll; RTC events only carry `task_id` + `timestamp`. + ### Relevant files - `app/src/server/cloud_objects/listener.rs:113-116` — websocket message type with `task_id` From 4b2a81d967365a57e94e8aa37ec2af8e08ae0137 Mon Sep 17 00:00:00 2001 From: liliwilson Date: Tue, 19 May 2026 14:33:54 -0700 Subject: [PATCH 3/5] Update force_refresh to respect the task fetch state map --- app/src/ai/agent_conversations_model.rs | 64 ++++++++++++------------- 1 file changed, 31 insertions(+), 33 deletions(-) diff --git a/app/src/ai/agent_conversations_model.rs b/app/src/ai/agent_conversations_model.rs index b9c015328b..5d75b25aab 100644 --- a/app/src/ai/agent_conversations_model.rs +++ b/app/src/ai/agent_conversations_model.rs @@ -692,7 +692,7 @@ impl AgentConversationsModel { .get_terminal_view_id_for_ambient_task(*task_id) .is_some(); if has_open_tab { - self.get_or_async_fetch_task_data_internal(task_id, true, ctx); + self.force_refresh_task(task_id, ctx); } let has_list_consumers = self @@ -1561,44 +1561,45 @@ impl AgentConversationsModel { task_id: &AmbientAgentTaskId, ctx: &mut ModelContext, ) -> Option { - self.get_or_async_fetch_task_data_internal(task_id, false, ctx) + if let Some(task) = self.tasks.get(task_id) { + return Some(task.clone()); + } + + self.async_fetch_task(task_id, ctx); + None } - // If `force_refresh` is true, this will invalidate the cache entry for the stored - // task's data and refetch the data from the server. We use this for handling RTC - // invalidations. - fn get_or_async_fetch_task_data_internal( + /// Invalidate the cached task and re-fetch from server. In-flight dedup and + /// failure cooldowns still apply. + fn force_refresh_task( &mut self, task_id: &AmbientAgentTaskId, - force_refresh: bool, ctx: &mut ModelContext, - ) -> Option { - if !force_refresh { - if let Some(task) = self.tasks.get(task_id) { - return Some(task.clone()); - } - } + ) { + self.async_fetch_task(task_id, ctx); + } - // Consult the per-task fetch state unless force-refreshing. - if !force_refresh { - match self.task_fetch_state.get(task_id) { - Some(TaskFetchState::InFlight) => return None, - Some(TaskFetchState::PermanentlyFailed { at, .. }) => { - if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN { - return None; - } - self.task_fetch_state.remove(task_id); + /// Consult fetch-state guards and spawn a fetch if allowed. + fn async_fetch_task( + &mut self, + task_id: &AmbientAgentTaskId, + ctx: &mut ModelContext, + ) { + match self.task_fetch_state.get(task_id) { + Some(TaskFetchState::InFlight) => return, + Some(TaskFetchState::PermanentlyFailed { at, .. }) => { + if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN { + return; } - Some(TaskFetchState::TransientlyFailed { at, .. }) => { - if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN { - return None; - } - self.task_fetch_state.remove(task_id); + self.task_fetch_state.remove(task_id); + } + Some(TaskFetchState::TransientlyFailed { at, .. }) => { + if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN { + return; } - None => {} + self.task_fetch_state.remove(task_id); } - } else { - self.task_fetch_state.remove(task_id); + None => {} } // Opportunistically purge other expired entries so the map doesn't grow unbounded. @@ -1654,9 +1655,6 @@ impl AgentConversationsModel { } }, ); - - // Return the stale cached copy if available (force_refresh keeps it in the map). - self.tasks.get(task_id).cloned() } /// Returns all (name, uid) pairs for creators of tasks in the model. From c9d15127cfbe8fa6afe419833681b22235fc04c0 Mon Sep 17 00:00:00 2001 From: liliwilson Date: Wed, 20 May 2026 22:59:23 -0700 Subject: [PATCH 4/5] More cleaning up --- app/src/ai/agent_conversations_model.rs | 46 +++++------- specs/rtc-task-invalidation-phase1/TECH.md | 84 ++++++++++------------ 2 files changed, 55 insertions(+), 75 deletions(-) diff --git a/app/src/ai/agent_conversations_model.rs b/app/src/ai/agent_conversations_model.rs index 5d75b25aab..53d7138905 100644 --- a/app/src/ai/agent_conversations_model.rs +++ b/app/src/ai/agent_conversations_model.rs @@ -528,7 +528,7 @@ pub struct AgentConversationsModel { /// and are absent from this map. task_fetch_state: HashMap, rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState, - /// Earliest RTC timestamp received while no consumer view was open. + /// Earliest RTC timestamp received while no list surface was open. /// On next `register_view_open`, triggers a single `fetch_tasks_updated_after`. dirty_since: Option>, } @@ -687,27 +687,28 @@ impl AgentConversationsModel { return; }; - // (a) If this task has an open tab (any window), force a re-fetch. - let has_open_tab = ActiveAgentViewsModel::as_ref(ctx) - .get_terminal_view_id_for_ambient_task(*task_id) - .is_some(); - if has_open_tab { - self.force_refresh_task(task_id, ctx); - } - let has_list_consumers = self .active_data_consumers_per_window .values() .any(|views| !views.is_empty()); if has_list_consumers { - // (b) If management view or conversation list is open, throttled list-fetch. + // (a) If management view or conversation list is open, throttled list-fetch. self.handle_rtc_for_list_views(*timestamp, ctx); } else { - // (c) Nothing open: record earliest timestamp for flush on next view open. - record_earliest_rtc_task_refresh_timestamp(&mut self.dirty_since, *timestamp); + let has_open_tab = ActiveAgentViewsModel::as_ref(ctx) + .get_terminal_view_id_for_ambient_task(*task_id) + .is_some(); + if has_open_tab { + // (b) If this task has an open tab (any window), force a re-fetch. + self.async_fetch_task(task_id, ctx); + } else { + // (c) No list surface open: record earliest timestamp for flush on next view open. + record_earliest_rtc_task_refresh_timestamp(&mut self.dirty_since, *timestamp); + } } } + // Handle RTC invalidations for list views, respecting the refresh throttling. fn handle_rtc_for_list_views( &mut self, timestamp: DateTime, @@ -978,7 +979,7 @@ impl AgentConversationsModel { .insert(view_id); self.update_polling_state(ctx); - // Flush dirty tasks accumulated while no view was open. + // Flush dirty tasks accumulated while no list surface was open. if let Some(dirty_since) = self.dirty_since.take() { self.fetch_tasks_updated_after(dirty_since, ctx); } @@ -1561,6 +1562,7 @@ impl AgentConversationsModel { task_id: &AmbientAgentTaskId, ctx: &mut ModelContext, ) -> Option { + // If we already have it, return it if let Some(task) = self.tasks.get(task_id) { return Some(task.clone()); } @@ -1569,28 +1571,15 @@ impl AgentConversationsModel { None } - /// Invalidate the cached task and re-fetch from server. In-flight dedup and - /// failure cooldowns still apply. - fn force_refresh_task( - &mut self, - task_id: &AmbientAgentTaskId, - ctx: &mut ModelContext, - ) { - self.async_fetch_task(task_id, ctx); - } - /// Consult fetch-state guards and spawn a fetch if allowed. - fn async_fetch_task( - &mut self, - task_id: &AmbientAgentTaskId, - ctx: &mut ModelContext, - ) { + fn async_fetch_task(&mut self, task_id: &AmbientAgentTaskId, ctx: &mut ModelContext) { match self.task_fetch_state.get(task_id) { Some(TaskFetchState::InFlight) => return, Some(TaskFetchState::PermanentlyFailed { at, .. }) => { if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN { return; } + // Cooldown has elapsed; clear the entry and fall through to fetch again. self.task_fetch_state.remove(task_id); } Some(TaskFetchState::TransientlyFailed { at, .. }) => { @@ -1881,6 +1870,7 @@ impl AgentConversationsModel { self.abort_rtc_task_refresh_throttle(); self.active_data_consumers_per_window.clear(); self.task_fetch_state.clear(); + self.dirty_since = None; // Reset the initial load flag so that we can retry the initial sync with the new logged in user self.has_finished_initial_load = false; } diff --git a/specs/rtc-task-invalidation-phase1/TECH.md b/specs/rtc-task-invalidation-phase1/TECH.md index 11a3654c50..0d04cc6ff1 100644 --- a/specs/rtc-task-invalidation-phase1/TECH.md +++ b/specs/rtc-task-invalidation-phase1/TECH.md @@ -2,7 +2,7 @@ ## Context -RTC invalidations for cloud agent tasks cause excessive `GET /api/v1/agent/runs` requests. During a bug bash with multiple concurrent agents on a team, this triggered 429 rate limiting that blocked spawning new agents. See [investigation report](/Users/liliwilson/Downloads/05_18_2025%20429s%20when%20starting%20cloud%20agents.md) for full context. +RTC invalidations for cloud agent tasks cause excessive `GET /api/v1/agent/runs` requests. During a bug bash with multiple concurrent agents on a team, this triggered 429 rate limiting that blocked spawning new agents. ### Current flow @@ -79,39 +79,44 @@ fn handle_update_manager_event(&mut self, event: &UpdateManagerEvent, ctx: &mut return; }; - // (a) Details panel: if any window has this task focused, do a targeted single-task fetch. - // This uses get_or_async_fetch_task_data which already deduplicates in-flight requests. - if self.is_task_actively_viewed(*task_id, ctx) { - self.refresh_single_task(*task_id, ctx); - } - - // (b) List views: if management view or conversation list is open, do a throttled list-fetch. - if self.has_any_active_data_consumers() { + let has_list_consumers = self + .active_data_consumers_per_window + .values() + .any(|views| !views.is_empty()); + if has_list_consumers { + // (a) List views: if management view or conversation list is open, do a throttled list-fetch. self.handle_rtc_for_list_views(*timestamp, ctx); } else { - // (c) Nothing open: mark dirty for later. - self.dirty_task_ids.insert(*task_id); + let has_open_tab = ActiveAgentViewsModel::as_ref(ctx) + .get_terminal_view_id_for_ambient_task(*task_id) + .is_some(); + if has_open_tab { + // (b) Details panel: if any window has this task focused, do a targeted single-task fetch. + // This still respects per-task dedup and failure cooldowns. + self.async_fetch_task(task_id, ctx); + } else { + // (c) No list surface or open tab: mark dirty for a later list refresh. + record_earliest_rtc_task_refresh_timestamp(&mut self.dirty_since, *timestamp); + } } } ``` -#### 2a. `is_task_actively_viewed` +#### 2a. Open-tab check Check `ActiveAgentViewsModel` for whether the `task_id` has an open ambient session tab: ```rust path=null start=null -fn is_task_actively_viewed(&self, task_id: AmbientAgentTaskId, ctx: &AppContext) -> bool { - ActiveAgentViewsModel::as_ref(ctx) - .get_terminal_view_id_for_ambient_task(task_id) - .is_some() -} +let has_open_tab = ActiveAgentViewsModel::as_ref(ctx) + .get_terminal_view_id_for_ambient_task(*task_id) + .is_some(); ``` -This covers any window where the task is open in a tab (not just the focused window). The details panel refresh is cheap — `get_or_async_fetch_task_data` won't spawn a second HTTP request if one is already in flight for the same task_id (it checks `TaskFetchState::InFlight` at `agent_conversations_model.rs:1532` and returns `None` immediately). +This covers any window where the task is open in a tab (not just the focused window). It only runs when no list surface is open, so one RTC event does not trigger both a single-task fetch and a list-fetch. -#### 2b. `refresh_single_task` +#### 2b. `async_fetch_task` -Call `get_or_async_fetch_task_data` which already exists, has per-task dedup (`TaskFetchState::InFlight`), backoff for failures, and emits `TasksUpdated` on completion. No new code needed — just call it. +Call the shared task-fetch path which already has per-task dedup (`TaskFetchState::InFlight`), backoff for failures, and emits `TasksUpdated` on completion. #### 2c. `handle_rtc_for_list_views` @@ -119,55 +124,40 @@ Extract the existing throttle logic from today's `handle_update_manager_event` i We keep the list-fetch (rather than batching single-task fetches) because: (1) the management view shows team tasks, so it needs to discover new tasks created by teammates — single-task fetches can only refresh known task_ids; (2) during bursts (20 tasks changing in a 5s window), 1 list-fetch is cheaper than 20 individual requests; (3) the big win is gating — not doing the list-fetch at all when the view isn't open. -#### 2d. `has_any_active_data_consumers` - -```rust path=null start=null -fn has_any_active_data_consumers(&self) -> bool { - self.active_data_consumers_per_window - .values() - .any(|views| !views.is_empty()) -} -``` ### 3. Dirty-on-open flush -Add a `dirty_task_ids: HashSet` field to `AgentConversationsModel`. +Add a `dirty_since: Option>` field to `AgentConversationsModel`. + +When an RTC event arrives while no list surface is open and the task does not have an open tab, keep the earliest timestamp in `dirty_since`. -In `register_view_open`, after the existing logic, flush dirty tasks: +In `register_view_open`, after the existing logic, flush dirty state with one list refresh: ```rust path=null start=null -if !self.dirty_task_ids.is_empty() { - let dirty_ids: Vec<_> = self.dirty_task_ids.drain().collect(); - for task_id in dirty_ids { - self.get_or_async_fetch_task_data(&task_id, ctx); - } +if let Some(dirty_since) = self.dirty_since.take() { + self.fetch_tasks_updated_after(dirty_since, ctx); } ``` -This uses single-task fetches (not a list-fetch) so it's bounded by the number of tasks that changed while the view was closed. - -Cap the dirty set at ~200 entries. If it overflows, clear it and do a full list-fetch on next view open instead. +This keeps the closed-view path at zero requests, then performs one bounded list refresh when a list view becomes visible again. ### Summary of request reduction Before: every RTC event → 1 list-fetch (`GET /agent/runs?updated_after=...`), throttled to 1 per 5s. After: -- Details panel open, nothing else → 1 single-task fetch per event (deduped by `TaskFetchState`) +- Details panel open, no list surface open → 1 single-task fetch per event (deduped by `TaskFetchState`) - Management/convo list open → same list-fetch as today (throttled) -- Nothing open → 0 requests, dirty marks only - -No safety-net periodic poll for now. The websocket `Listener` already triggers an object refresh on reconnection (`listener.rs:232`), which covers the main failure mode (disconnect). If we see data gaps from silently dropped messages, we can add a slow poll later. +- No list surface or open tab → 0 requests, dirty timestamp only For a team of 10 running 5 agents with 4-5 state changes each: before = ~250 list-fetches across team in 5 min. After = ~0 list-fetches for users not looking at views, plus ~1 single-task fetch per user per agent they have open. ## Testing and validation **Unit tests** in `agent_conversations_model_tests.rs`: -- RTC event with `task_id` when details panel has that task open → `get_or_async_fetch_task_data` called -- RTC event when no views open → task_id added to `dirty_task_ids`, no fetch -- `register_view_open` with dirty tasks → each dirty task fetched via single-task endpoint -- Dirty set overflow (>200) → cleared, list-fetch on next open +- RTC event with `task_id` when details panel has that task open and no list surface is open → targeted task refresh path used +- RTC event when no list surface or open tab is present → earliest dirty timestamp recorded +- `register_view_open` with `dirty_since` set → one `fetch_tasks_updated_after` call **Manual verification**: - Add `log::info!("[lili] ...")` in `handle_update_manager_event` to count fetches before/after From 506b2de6bbdc1a0824894eca50a1137d1a2ce67a Mon Sep 17 00:00:00 2001 From: liliwilson Date: Thu, 21 May 2026 16:36:16 -0700 Subject: [PATCH 5/5] Respond to Ben review. --- app/src/ai/agent_conversations_model.rs | 2 ++ app/src/server/cloud_objects/update_manager.rs | 12 +++++++++--- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/app/src/ai/agent_conversations_model.rs b/app/src/ai/agent_conversations_model.rs index 53d7138905..a057ebe0e0 100644 --- a/app/src/ai/agent_conversations_model.rs +++ b/app/src/ai/agent_conversations_model.rs @@ -777,6 +777,8 @@ impl AgentConversationsModel { // Subtract 1 second to give buffer for clock differences with server let updated_after = timestamp - chrono::Duration::seconds(1); + // Reset `dirty_since` now that we are doing a fetch. + self.dirty_since = None; ctx.spawn_with_retry_on_error( move || { diff --git a/app/src/server/cloud_objects/update_manager.rs b/app/src/server/cloud_objects/update_manager.rs index 033a35a3b4..9f13f7d8c3 100644 --- a/app/src/server/cloud_objects/update_manager.rs +++ b/app/src/server/cloud_objects/update_manager.rs @@ -10,6 +10,7 @@ use std::sync::mpsc::SyncSender; use std::sync::Arc; use std::time::Duration; use warp_core::features::FeatureFlag; +use warp_core::report_error; use warp_graphql::mcp_gallery_template::MCPGalleryTemplate; use warp_graphql::object_permissions::AccessLevel; use warp_graphql::scalars::time::ServerTimestamp; @@ -1123,9 +1124,14 @@ impl UpdateManager { timestamp: DateTime, ctx: &mut ModelContext, ) { - let Ok(task_id) = task_id.parse::() else { - log::warn!("Ignoring AmbientTaskUpdated with unparseable task_id: {task_id}"); - return; + let task_id = match task_id.parse::() { + Ok(task_id) => task_id, + Err(err) => { + report_error!(anyhow::Error::from(err).context(format!( + "AmbientTaskUpdated has unparseable task_id: {task_id}" + ))); + return; + } }; ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp }); }