-
Notifications
You must be signed in to change notification settings - Fork 3k
fix(realtime): sync remote items to local chat_ctx with placeholders to prevent in-flight deletion #5114
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(realtime): sync remote items to local chat_ctx with placeholders to prevent in-flight deletion #5114
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -564,6 +564,7 @@ async def _list_mcp_tools_task( | |
| self._on_input_audio_transcription_completed, | ||
| ) | ||
| self._rt_session.on("metrics_collected", self._on_metrics_collected) | ||
| self._rt_session.on("remote_item_added", self._on_remote_item_added) | ||
| self._rt_session.on("error", self._on_error) | ||
|
|
||
| remove_instructions(self._agent._chat_ctx) | ||
|
|
@@ -736,6 +737,7 @@ async def _close_session(self) -> None: | |
| self._on_input_audio_transcription_completed, | ||
| ) | ||
| self._rt_session.off("metrics_collected", self._on_metrics_collected) | ||
| self._rt_session.off("remote_item_added", self._on_remote_item_added) | ||
| self._rt_session.off("error", self._on_error) | ||
|
|
||
| if isinstance(self.stt, stt.STT): | ||
|
|
@@ -1161,6 +1163,20 @@ def _on_metrics_collected( | |
| trace_utils.record_realtime_metrics(realtime_span, ev) | ||
| self._session.emit("metrics_collected", MetricsCollectedEvent(metrics=ev)) | ||
|
|
||
| def _on_remote_item_added(self, ev: llm.RemoteItemAddedEvent) -> None: | ||
| # add the remote item to the local chat context as a placeholder | ||
| local_chat_ctx = self._agent._chat_ctx | ||
| if local_chat_ctx.get_by_id(ev.item.id) is not None: | ||
| return | ||
|
|
||
| # only add placeholders for server-initiated items (responses, function calls), | ||
| # which always append at the end of the conversation. client-initiated items | ||
| # (from update_chat_ctx) already exist in _agent._chat_ctx and go local→remote, | ||
| # so they don't need placeholders. | ||
| last_item_id = local_chat_ctx.items[-1].id if local_chat_ctx.items else None | ||
| if ev.previous_item_id is None or ev.previous_item_id == last_item_id: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. QQ: would parallel tool calls have the same
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gpt-realtime can do "parallel" tool calls by doing multiple tool calls concurrently on one turn, but they have a sequential order on arrival from what I understand. When I saw this line, I have to admit it stood out as a risk, but I can't think of a realistic scenario that causes an issue. I thought we could be on the safe side, and log a warning in the else condition, just to catch if there is anything we have missed.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yeah, I think the function calls will come in sequential with different previous item id, each class ConversationItemAdded(BaseModel):
"""Sent by the server when an Item is added to the default Conversation.
This can happen in several cases:
- When the client sends a `conversation.item.create` event.
- When the input audio buffer is committed. In this case the item will be a user message containing the audio from the buffer.
- When the model is generating a Response. In this case the `conversation.item.added` event will be sent when the model starts generating a specific Item, and thus it will not yet have any content (and `status` will be `in_progress`).
The event will include the full content of the Item (except when model is generating a Response) except for audio data, which can be retrieved separately with a `conversation.item.retrieve` event if necessary.
"""
event_id: str
"""The unique ID of the server event."""
item: ConversationItem
"""A single item within a Realtime conversation."""
type: Literal["conversation.item.added"]
"""The event type, must be `conversation.item.added`."""
previous_item_id: Optional[str] = None
"""The ID of the item that precedes this one, if any.
This is used to maintain ordering when items are inserted.
"""
ConversationItem: TypeAlias = Annotated[
Union[
RealtimeConversationItemSystemMessage,
RealtimeConversationItemUserMessage,
RealtimeConversationItemAssistantMessage,
RealtimeConversationItemFunctionCall,
RealtimeConversationItemFunctionCallOutput,
RealtimeMcpApprovalResponse,
RealtimeMcpListTools,
RealtimeMcpToolCall,
RealtimeMcpApprovalRequest,
],
PropertyInfo(discriminator="type"),
] |
||
| local_chat_ctx.items.append(ev.item.model_copy()) | ||
|
Comment on lines
+1166
to
+1178
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔴 _on_remote_item_added leaves stale empty placeholder messages in local chat context When the server creates an assistant message item, Prompt for agentsWas this helpful? React with 👍 or 👎 to provide feedback.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The server still has that item, the next server-initiated item would have previous_item_id pointing to this message, which wouldn't match last_item_id in _on_remote_item_added, so the next placeholder would be silently dropped. so maybe we should allow empty item for realtime session.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think empty message is fine? If it is interrupted w/o any text forwarded or is an audio-only message, the remote message should be "empty" as well, right? |
||
|
|
||
| def _on_error( | ||
| self, error: llm.LLMError | stt.STTError | tts.TTSError | llm.RealtimeModelError | ||
| ) -> None: | ||
|
|
@@ -1210,7 +1226,7 @@ def _on_input_audio_transcription_completed(self, ev: llm.InputTranscriptionComp | |
| # TODO: for realtime models, the created_at field is off. it should be set to when the user started speaking. | ||
| # but we don't have that information here. | ||
| msg = llm.ChatMessage(role="user", content=[ev.transcript], id=ev.item_id) | ||
| self._agent._chat_ctx.items.append(msg) | ||
| self._agent._chat_ctx._upsert_item(msg) | ||
| self._session._conversation_item_added(msg) | ||
|
|
||
| def _on_generation_created(self, ev: llm.GenerationCreatedEvent) -> None: | ||
|
|
@@ -2328,7 +2344,7 @@ async def _realtime_reply_task( | |
| chat_ctx = self._rt_session.chat_ctx.copy() | ||
| msg = chat_ctx.add_message(role="user", content=user_input) | ||
| await self._rt_session.update_chat_ctx(chat_ctx) | ||
| self._agent._chat_ctx.items.append(msg) | ||
| self._agent._chat_ctx._upsert_item(msg) | ||
| self._session._conversation_item_added(msg) | ||
|
|
||
| ori_tool_choice = self._tool_choice | ||
|
|
@@ -2587,7 +2603,7 @@ async def _read_fnc_stream() -> None: | |
|
|
||
| def _tool_execution_started_cb(fnc_call: llm.FunctionCall) -> None: | ||
| speech_handle._item_added([fnc_call]) | ||
| self._agent._chat_ctx.items.append(fnc_call) | ||
| self._agent._chat_ctx._upsert_item(fnc_call) | ||
| self._session._tool_items_added([fnc_call]) | ||
|
|
||
| def _tool_execution_completed_cb(out: ToolExecutionOutput) -> None: | ||
|
|
@@ -2689,7 +2705,7 @@ def _create_assistant_message( | |
| forwarded_text=forwarded_text, | ||
| interrupted=speech_handle.interrupted, | ||
| ) | ||
| self._agent._chat_ctx.items.append(msg) | ||
| self._agent._chat_ctx._upsert_item(msg) | ||
| speech_handle._item_added([msg]) | ||
| self._session._conversation_item_added(msg) | ||
| current_span.set_attribute(trace_types.ATTR_RESPONSE_TEXT, forwarded_text) | ||
|
|
@@ -2741,7 +2757,7 @@ def _create_assistant_message( | |
| fnc_executed_ev._reply_required = True | ||
|
|
||
| # add tool output to the chat context | ||
| self._agent._chat_ctx.items.append(sanitized_out.fnc_call_out) | ||
| self._agent._chat_ctx._upsert_item(sanitized_out.fnc_call_out) | ||
| self._session._tool_items_added([sanitized_out.fnc_call_out]) | ||
|
|
||
| if new_agent_task is not None and sanitized_out.agent_task is not None: | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.