Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 60 additions & 11 deletions src/google/adk/flows/llm_flows/contents.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@

logger = logging.getLogger('google_adk.' + __name__)

# Error response for orphaned function calls (issue #3971)
_ORPHANED_CALL_ERROR_RESPONSE = {'error': 'Tool execution was interrupted.'}


class _ContentLlmRequestProcessor(BaseLlmRequestProcessor):
"""Builds the contents for the LLM request."""
Expand Down Expand Up @@ -76,10 +79,44 @@ async def run_async(
request_processor = _ContentLlmRequestProcessor()


def _create_synthetic_response_for_orphaned_calls(
event: Event,
orphaned_calls: list[types.FunctionCall],
) -> Event:
"""Create synthetic error responses for orphaned function calls."""
error_response = _ORPHANED_CALL_ERROR_RESPONSE
parts: list[types.Part] = []

for func_call in orphaned_calls:
logger.warning(
'Auto-healing orphaned function_call (id=%s, name=%s). '
'This indicates execution was interrupted before tool completion.',
func_call.id,
func_call.name,
)
part = types.Part.from_function_response(
name=func_call.name,
response=error_response,
)
part.function_response.id = func_call.id
parts.append(part)

return Event(
invocation_id=event.invocation_id,
author='user',
content=types.Content(role='user', parts=parts),
branch=event.branch,
)


def _rearrange_events_for_async_function_responses_in_history(
events: list[Event],
) -> list[Event]:
"""Rearrange the async function_response events in the history."""
"""Rearrange the async function_response events in the history.

Also auto-heals orphaned function_calls by injecting synthetic error
responses to prevent crash loops (issue #3971).
"""

function_call_id_to_response_events_index: dict[str, int] = {}
for i, event in enumerate(events):
Expand All @@ -95,26 +132,37 @@ def _rearrange_events_for_async_function_responses_in_history(
# function_response should be handled together with function_call below.
continue
elif event.get_function_calls():

function_response_events_indices = set()
orphaned_calls: list[types.FunctionCall] = []
for function_call in event.get_function_calls():
function_call_id = function_call.id
if function_call_id in function_call_id_to_response_events_index:
function_response_events_indices.add(
function_call_id_to_response_events_index[function_call_id]
)
elif function_call_id and not (
event.long_running_tool_ids
and function_call_id in event.long_running_tool_ids
):
orphaned_calls.append(function_call)
result_events.append(event)
if not function_response_events_indices:
if not function_response_events_indices and not orphaned_calls:
continue
if len(function_response_events_indices) == 1:
result_events.append(
events[next(iter(function_response_events_indices))]
)
else: # Merge all async function_response as one response event
if function_response_events_indices:
if len(function_response_events_indices) == 1:
result_events.append(
events[next(iter(function_response_events_indices))]
)
else: # Merge all async function_response as one response event
result_events.append(
_merge_function_response_events(
[events[i] for i in sorted(function_response_events_indices)]
)
)
Comment on lines +152 to +161
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _merge_function_response_events function is capable of handling a list with a single event, which makes the special case for len(function_response_events_indices) == 1 redundant. You can simplify this block by removing the if/else and just using the logic from the else branch for all cases where function_response_events_indices is not empty. This improves code maintainability by reducing complexity.

Suggested change
if len(function_response_events_indices) == 1:
result_events.append(
events[next(iter(function_response_events_indices))]
)
else: # Merge all async function_response as one response event
result_events.append(
_merge_function_response_events(
[events[i] for i in sorted(function_response_events_indices)]
)
)
result_events.append(
_merge_function_response_events(
[events[i] for i in sorted(function_response_events_indices)]
)
)

# Inject synthetic responses for orphaned calls (issue #3971)
if orphaned_calls:
result_events.append(
_merge_function_response_events(
[events[i] for i in sorted(function_response_events_indices)]
)
_create_synthetic_response_for_orphaned_calls(event, orphaned_calls)
)
continue
else:
Expand Down Expand Up @@ -441,6 +489,7 @@ def _get_contents(
result_events = _rearrange_events_for_latest_function_response(
filtered_events
)
# Auto-heal orphaned function_calls to prevent crash loop (issue #3971)
result_events = _rearrange_events_for_async_function_responses_in_history(
result_events
)
Expand Down
Loading