Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 66 additions & 23 deletions app/src/ai/agent_conversations_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,9 @@ pub struct AgentConversationsModel {
/// and are absent from this map.
task_fetch_state: HashMap<AmbientAgentTaskId, TaskFetchState>,
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState,
/// Earliest RTC timestamp received while no list surface was open.
/// On next `register_view_open`, triggers a single `fetch_tasks_updated_after`.
dirty_since: Option<DateTime<Utc>>,
}

pub enum AgentConversationsModelEvent {
Expand Down Expand Up @@ -575,6 +578,7 @@ impl AgentConversationsModel {
has_finished_initial_load: true,
task_fetch_state: HashMap::new(),
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(),
dirty_since: None,
};
}

Expand Down Expand Up @@ -613,6 +617,7 @@ impl AgentConversationsModel {
has_finished_initial_load: false,
task_fetch_state: HashMap::new(),
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(),
dirty_since: None,
};

// Only sync local conversations if we're not in CLI mode. Server-side data
Expand Down Expand Up @@ -678,23 +683,51 @@ impl AgentConversationsModel {
event: &UpdateManagerEvent,
ctx: &mut ModelContext<Self>,
) {
if let UpdateManagerEvent::AmbientTaskUpdated { timestamp } = event {
match std::mem::take(&mut self.rtc_task_refresh_throttle_state) {
RtcTaskRefreshThrottleState::Idle => {
self.fetch_tasks_updated_after(*timestamp, ctx);
self.start_rtc_task_refresh_throttle_timer(ctx);
}
RtcTaskRefreshThrottleState::CoolingDown {
mut pending_timestamp,
let UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp } = event else {
return;
};

let has_list_consumers = self
.active_data_consumers_per_window
.values()
.any(|views| !views.is_empty());
if has_list_consumers {
// (a) If management view or conversation list is open, throttled list-fetch.
self.handle_rtc_for_list_views(*timestamp, ctx);
} else {
let has_open_tab = ActiveAgentViewsModel::as_ref(ctx)
.get_terminal_view_id_for_ambient_task(*task_id)
.is_some();
if has_open_tab {
// (b) If this task has an open tab (any window), force a re-fetch.
self.async_fetch_task(task_id, ctx);
} else {
// (c) No list surface open: record earliest timestamp for flush on next view open.
record_earliest_rtc_task_refresh_timestamp(&mut self.dirty_since, *timestamp);
}
}
}

// Handle RTC invalidations for list views, respecting the refresh throttling.
fn handle_rtc_for_list_views(
&mut self,
timestamp: DateTime<Utc>,
ctx: &mut ModelContext<Self>,
) {
match std::mem::take(&mut self.rtc_task_refresh_throttle_state) {
RtcTaskRefreshThrottleState::Idle => {
self.fetch_tasks_updated_after(timestamp, ctx);
self.start_rtc_task_refresh_throttle_timer(ctx);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] This timer can outlive the list consumers: if a pending timestamp is recorded while a list view is open and the user closes all list views before the cooldown fires, start_rtc_task_refresh_throttle_timer will still run fetch_tasks_updated_after, causing the broad refresh this PR is trying to avoid. Abort the RTC throttle when the last list consumer closes or re-check has_list_consumers before flushing the pending timestamp.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems fine? A few unnecessary refreshes if someone happens to toggle the management view shouldn't add that much traffic

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I agree with you, I think this is fine

}
RtcTaskRefreshThrottleState::CoolingDown {
mut pending_timestamp,
timer_abort_handle,
} => {
record_earliest_rtc_task_refresh_timestamp(&mut pending_timestamp, timestamp);
self.rtc_task_refresh_throttle_state = RtcTaskRefreshThrottleState::CoolingDown {
pending_timestamp,
timer_abort_handle,
} => {
record_earliest_rtc_task_refresh_timestamp(&mut pending_timestamp, *timestamp);
self.rtc_task_refresh_throttle_state =
RtcTaskRefreshThrottleState::CoolingDown {
pending_timestamp,
timer_abort_handle,
};
}
};
}
}
}
Expand Down Expand Up @@ -744,6 +777,8 @@ impl AgentConversationsModel {

// Subtract 1 second to give buffer for clock differences with server
let updated_after = timestamp - chrono::Duration::seconds(1);
// Reset `dirty_since` now that we are doing a fetch.
self.dirty_since = None;

ctx.spawn_with_retry_on_error(
move || {
Expand Down Expand Up @@ -945,6 +980,11 @@ impl AgentConversationsModel {
.or_default()
.insert(view_id);
self.update_polling_state(ctx);

// Flush dirty tasks accumulated while no list surface was open.
if let Some(dirty_since) = self.dirty_since.take() {
self.fetch_tasks_updated_after(dirty_since, ctx);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ [IMPORTANT] This catch-up can miss updates: fetch_tasks_updated_after requests only INITIAL_TASK_AMOUNT tasks, so if more than 100 tasks changed while no view was open, reopening a list view drops the rest while RTC polling remains disabled. Track dirty task IDs or page until all updates are loaded before clearing dirty_since.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is fine, since the management views already behave this way—we fetch more things when you apply a filter, but some tasks are genuinely inaccessible via this view because they are too old

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, the view should always be showing the most recent 100 tasks that match the filters - we're not trying to show all tasks

}
}

/// Called when a view that consumes this model's data becomes hidden.
Expand Down Expand Up @@ -1529,20 +1569,24 @@ impl AgentConversationsModel {
return Some(task.clone());
}

// Consult the per-task fetch state. The three variants are mutually exclusive: at most
// one applies to a given id.
self.async_fetch_task(task_id, ctx);
None
}

/// Consult fetch-state guards and spawn a fetch if allowed.
fn async_fetch_task(&mut self, task_id: &AmbientAgentTaskId, ctx: &mut ModelContext<Self>) {
match self.task_fetch_state.get(task_id) {
Some(TaskFetchState::InFlight) => return None,
Some(TaskFetchState::InFlight) => return,
Some(TaskFetchState::PermanentlyFailed { at, .. }) => {
if at.elapsed() < PERMANENT_FETCH_FAILURE_COOLDOWN {
return None;
return;
}
// Cooldown has elapsed; clear the entry and fall through to fetch again.
self.task_fetch_state.remove(task_id);
}
Some(TaskFetchState::TransientlyFailed { at, .. }) => {
if at.elapsed() < TRANSIENT_FETCH_FAILURE_COOLDOWN {
return None;
return;
}
self.task_fetch_state.remove(task_id);
}
Expand Down Expand Up @@ -1602,8 +1646,6 @@ impl AgentConversationsModel {
}
},
);

None
}

/// Returns all (name, uid) pairs for creators of tasks in the model.
Expand Down Expand Up @@ -1830,6 +1872,7 @@ impl AgentConversationsModel {
self.abort_rtc_task_refresh_throttle();
self.active_data_consumers_per_window.clear();
self.task_fetch_state.clear();
self.dirty_since = None;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should fetch_tasks_updated_after also clear dirty_since? Once we do a list refresh, either:

  • The updated but unfetched task(s) will be included in the list response
  • They'll be old enough to fall out of the view

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We always take dirty_since whenever we open a consumer view (in register_view_open), and only set it again when a consumer view is closed. I feel like that should be okay? Although if you think it's semantically clearer/safer to do so I can update

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah this seems fine then. I think it's a little safer to also clear in fetch_tasks_updated_after, in case we introduce another flow that does a full refresh (or there's a bug where register_view_open doesn't get called)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair enough, I added it!

// Reset the initial load flag so that we can retry the initial sync with the new logged in user
self.has_finished_initial_load = false;
}
Expand Down
1 change: 1 addition & 0 deletions app/src/ai/agent_conversations_model_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,7 @@ fn create_test_model() -> AgentConversationsModel {
has_finished_initial_load: false,
task_fetch_state: Default::default(),
rtc_task_refresh_throttle_state: RtcTaskRefreshThrottleState::default(),
dirty_since: None,
}
}

Expand Down
43 changes: 31 additions & 12 deletions app/src/server/cloud_objects/update_manager.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::time::Duration;

use chrono::{DateTime, Utc};
use futures::channel::oneshot::{self, Receiver};
use futures::stream::AbortHandle;
use itertools::Itertools;
use lazy_static::lazy_static;
use regex::Regex;
use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::sync::mpsc::SyncSender;
use std::sync::Arc;
use std::time::Duration;
use warp_core::features::FeatureFlag;
use warp_core::report_error;
use warp_graphql::mcp_gallery_template::MCPGalleryTemplate;
use warp_graphql::object_permissions::AccessLevel;
use warp_graphql::scalars::time::ServerTimestamp;
Expand All @@ -26,6 +26,7 @@ use crate::ai::agent::conversation::AIConversationId;
use crate::ai::ambient_agents::scheduled::{
CloudScheduledAmbientAgentModel, ScheduledAmbientAgent,
};
use crate::ai::ambient_agents::AmbientAgentTaskId;
use crate::ai::blocklist::BlocklistAIHistoryModel;
use crate::ai::cloud_environments::{AmbientAgentEnvironment, CloudAmbientAgentEnvironmentModel};
use crate::ai::execution_profiles::profiles::AIExecutionProfilesModel;
Expand Down Expand Up @@ -127,10 +128,19 @@ pub struct ObjectOperationResult {

#[derive(Debug)]
pub enum UpdateManagerEvent {
ObjectOperationComplete { result: ObjectOperationResult },
CloudPreferencesUpdated { updated: Vec<Preference> },
MCPGalleryUpdated { templates: Vec<MCPGalleryTemplate> },
AmbientTaskUpdated { timestamp: DateTime<Utc> },
ObjectOperationComplete {
result: ObjectOperationResult,
},
CloudPreferencesUpdated {
updated: Vec<Preference>,
},
MCPGalleryUpdated {
templates: Vec<MCPGalleryTemplate>,
},
AmbientTaskUpdated {
task_id: AmbientAgentTaskId,
timestamp: DateTime<Utc>,
},
}

/// An enum for choosing the behavior of the fetch_single_cloud_object function.
Expand Down Expand Up @@ -1110,11 +1120,20 @@ impl UpdateManager {

fn handle_ambient_task_changed(
&mut self,
_task_id: String,
task_id: String,
timestamp: DateTime<Utc>,
ctx: &mut ModelContext<UpdateManager>,
) {
ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { timestamp });
let task_id = match task_id.parse::<AmbientAgentTaskId>() {
Ok(task_id) => task_id,
Err(err) => {
report_error!(anyhow::Error::from(err).context(format!(
"AmbientTaskUpdated has unparseable task_id: {task_id}"
)));
return;
}
};
ctx.emit(UpdateManagerEvent::AmbientTaskUpdated { task_id, timestamp });
}

/// Fetches environment "last used" timestamps from the server and merges them
Expand Down
Loading
Loading