Skip to content

Comments

[RHIDP-11647] Add Interrupt POST Endpoint for /v1/streaming_query#1176

Open
Jdubrick wants to merge 4 commits intolightspeed-core:mainfrom
Jdubrick:query-interrupt
Open

[RHIDP-11647] Add Interrupt POST Endpoint for /v1/streaming_query#1176
Jdubrick wants to merge 4 commits intolightspeed-core:mainfrom
Jdubrick:query-interrupt

Conversation

@Jdubrick
Copy link
Contributor

@Jdubrick Jdubrick commented Feb 18, 2026

Description

  • Adds /streaming_query/interrupt so that users are able to stop an in-flight streaming request
  • Streaming queries get a generated 'request_id' attached to them that you can use to trigger the interrupt if the user owns the request

Type of change

  • Refactor
  • New feature
  • Bug fix
  • CVE fix
  • Optimization
  • Documentation Update
  • Configuration Update
  • Bump-up service version
  • Bump-up dependent library
  • Bump-up library or tool used for development (does not change the final image)
  • CI configuration change
  • Konflux configuration change
  • Unit tests improvement
  • Integration tests improvement
  • End to end tests improvement
  • Benchmarks improvement

Tools used to create PR

Identify any AI code assistants used in this PR (for transparency and review context)

  • Assisted-by: Claude Opus 4.6
  • Generated by:

Related Tickets & Documents

Checklist before requesting a review

  • I have performed a self-review of my code.
  • PR has passed all pre-merge test jobs.
  • If it is a core feature, I have added thorough tests.

Testing

  • Please provide detailed steps to perform tests related to this code change.
  • How were the fix/results from this change verified? Please provide relevant screenshots or results.

Summary by CodeRabbit

  • New Features

    • POST /v1/streaming_query/interrupt to stop in-progress streams.
    • Streaming start events now include a request_id.
    • Interrupt responses include request_id, interrupted (bool), and a human-readable message.
  • Bug Fixes / Behavioral

    • Cancelled streams emit interruption events, are deregistered, and skip post-stream side effects.
  • Documentation & Tests

    • OpenAPI updated (examples include request_id) and new integration/unit tests for interrupt lifecycle.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Feb 18, 2026

Walkthrough

Adds an in-memory interrupt registry, a POST /v1/streaming_query/interrupt endpoint and models, and streaming-flow wiring to register per-request asyncio tasks, allow cancellation by request_id/user, emit interrupted SSE events, and update OpenAPI, models, routers, and tests.

Changes

Cohort / File(s) Summary
Interrupt Registry
src/utils/stream_interrupts.py
New thread-safe StreamInterruptRegistry and ActiveStream dataclass, module-level singleton and DI getter; register/cancel/deregister/get methods with ownership checks and task cancellation.
Interrupt Endpoint
src/app/endpoints/stream_interrupt.py
New APIRouter and POST /streaming_query/interrupt handler wired with auth and registry DI; validates ownership, cancels stream, returns StreamingInterruptResponse or 404.
Streaming Integration
src/app/endpoints/streaming_query.py
Per-request request_id generated and registered with registry; start SSE now includes request_id; task registration/deregistration added; handles CancelledError to emit interrupted SSE and skip post-stream side effects; adds stream_interrupted_event.
Models
src/models/requests.py, src/models/responses.py
Adds StreamingInterruptRequest (validated request_id) and StreamingInterruptResponse (request_id, interrupted, message); updates streaming SSE examples to include request_id.
Routing & OpenAPI
src/app/routers.py, docs/openapi.json
Registers new router under /v1; OpenAPI updated with POST /v1/streaming_query/interrupt, new schemas/responses, SSE examples updated; two A2A operationId tweaks.
Tests — Unit & Integration
tests/unit/.../test_stream_interrupt.py, tests/unit/.../test_streaming_query.py, tests/integration/.../test_stream_interrupt_integration.py, tests/integration/test_openapi_json.py, tests/unit/app/test_routers.py
Adds unit/integration tests for interrupt lifecycle, ownership and cancellation behavior; updates streaming tests to pass and assert request_id, registration, and deregistration.

Sequence Diagram

sequenceDiagram
    participant Client as Client
    participant StreamEndpoint as "Streaming Endpoint\n(/v1/streaming_query)"
    participant Registry as "Interrupt Registry\n(stream_interrupt_registry)"
    participant StreamTask as "Active Stream Task\n(asyncio.Task)"
    participant InterruptEndpoint as "Interrupt Endpoint\n(/v1/streaming_query/interrupt)"

    Client->>StreamEndpoint: POST /v1/streaming_query
    activate StreamEndpoint
    StreamEndpoint->>StreamEndpoint: generate request_id, create task
    StreamEndpoint->>Registry: register_stream(request_id, user_id, task)
    StreamEndpoint-->>Client: SSE start event (includes request_id)
    StreamTask->>Client: stream events...

    Client->>InterruptEndpoint: POST /v1/streaming_query/interrupt (request_id)
    activate InterruptEndpoint
    InterruptEndpoint->>Registry: cancel_stream(request_id, user_id)
    Registry->>StreamTask: cancel task
    StreamTask-->>StreamEndpoint: raises CancelledError
    StreamEndpoint-->>Client: SSE interrupted event
    StreamEndpoint->>Registry: deregister_stream(request_id)
    InterruptEndpoint-->>Client: 200 {interrupted:true, request_id, message}
    deactivate InterruptEndpoint
    deactivate StreamEndpoint
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested labels

ok-to-test

Suggested reviewers

  • eranco74
  • tisnik
  • are-ces
  • jrobertboos
🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and accurately describes the main change: adding a new interrupt POST endpoint for the streaming query path. It is concise, specific, and directly reflects the primary purpose of the pull request.
Docstring Coverage ✅ Passed Docstring coverage is 88.57% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/app/endpoints/streaming_query.py (1)

828-829: ⚠️ Potential issue | 🟡 Minor

Stale docstring in shield_violation_generator — it no longer yields start or end events.

The function body yields only a single token event. The start event is now emitted by generate_response before iterating any generator, and the end event is emitted by generate_response after completion.

🔧 Proposed fix
-    Yields start, token, and end events immediately for shield violations.
-    This function creates a minimal streaming response without going through
-    the Llama Stack response format.
+    Yields a single token event containing the violation message.
+    Start and end events are emitted by the caller (``generate_response``).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/streaming_query.py` around lines 828 - 829, The docstring
for shield_violation_generator is stale: it claims the generator yields start,
token, and end events but the implementation yields only a single token event;
update the docstring in the shield_violation_generator function to reflect the
current behavior (it yields a single token/event representing the violation, not
start or end events) and mention that generate_response now emits the start and
end events; reference the function name shield_violation_generator and update
wording to describe the single token payload and any fields it contains (e.g.,
token content and reason).
docs/openapi.json (1)

4414-4440: ⚠️ Potential issue | 🟠 Major

Make operationId unique between GET and POST.

Both /a2a GET and POST now share handle_a2a_jsonrpc_a2a_get, which violates OpenAPI uniqueness and breaks client generation. Use distinct IDs per method (e.g., restore _post for POST).

✅ Suggested fix
-                "operationId": "handle_a2a_jsonrpc_a2a_get",
+                "operationId": "handle_a2a_jsonrpc_a2a_post",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/openapi.json` around lines 4414 - 4440, The OpenAPI spec currently
reuses the same operationId "handle_a2a_jsonrpc_a2a_get" for both the GET and
POST A2A endpoints; change the POST operationId to a unique identifier (for
example "handle_a2a_jsonrpc_a2a_post") so GET and POST have distinct
operationIds and client generation won't fail—update the "operationId" field
under the POST operation accordingly.
tests/unit/app/test_routers.py (1)

133-168: ⚠️ Potential issue | 🟡 Minor

Update the docstring count to match the new assertions.
The docstring still says 16 routers while the test now asserts 20.

📝 Suggested docstring fix
-    Asserts that 16 routers are registered on a MockFastAPI instance and that
+    Asserts that 20 routers are registered on a MockFastAPI instance and that
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/test_routers.py` around lines 133 - 168, The
test_check_prefixes docstring is out of date (it says 16 routers) — update the
docstring in the test_check_prefixes function to reflect the current assertions
(20 routers) and, if present, adjust any explanatory text about which routers
have which prefixes so it matches the actual checks performed for
include_routers against the MockFastAPI instance (references:
test_check_prefixes, include_routers, MockFastAPI, and the router symbols like
conversations_v2.router/conversations_v1.router).
🧹 Nitpick comments (7)
src/utils/stream_interrupts.py (2)

17-17: asyncio.Task is missing a type parameter.

asyncio.Task is generic since Python 3.9; prefer asyncio.Task[Any] for completeness.

🔧 Proposed fix
+from typing import Any

 `@dataclass`
 class ActiveStream:
     user_id: str
-    task: asyncio.Task
+    task: asyncio.Task[Any]

Apply the same change to the register_stream signature:

-    def register_stream(self, request_id: str, user_id: str, task: asyncio.Task) -> None:
+    def register_stream(self, request_id: str, user_id: str, task: asyncio.Task[Any]) -> None:

As per coding guidelines: "Use complete type annotations for all function parameters and return types."

Also applies to: 29-29

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/stream_interrupts.py` at line 17, The task type annotations use the
bare generic asyncio.Task; update them to asyncio.Task[Any] and add Any to
imports (from typing import Any) so type checks are complete; specifically
change the parameter/type annotations for the task variable and the
register_stream function signature in stream_interrupts.py from asyncio.Task to
asyncio.Task[Any], and ensure the top-level imports include Any so the code
compiles with the updated annotations.

28-69: Method docstrings are missing Google-style Args sections.

None of the four public methods document their parameters.

As per coding guidelines: "Follow Google Python docstring conventions for modules, classes, and functions with Parameters, Returns, Raises, and Attributes sections."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/stream_interrupts.py` around lines 28 - 69, Add Google-style
docstring parameter sections to the four public methods in this class:
register_stream, cancel_stream, deregister_stream, and get_stream. For each
method include an "Args:" block that documents each parameter (request_id: str,
user_id: str where present, task: asyncio.Task where present) and for
cancel_stream and get_stream also add a "Returns:" block describing the return
value (bool for cancel_stream, Optional[ActiveStream] for get_stream). Keep
wording brief and follow the existing one-line summary then the Args/Returns
headers per Google Python docstring conventions.
src/app/endpoints/streaming_query.py (1)

87-88: stream_interrupt_registry singleton used directly in generate_response, breaking test overrides.

The interrupt endpoint correctly injects the registry via get_stream_interrupt_registry() (allowing app.dependency_overrides in tests), but generate_response imports and calls the module-level singleton directly. Integration tests for the streaming path cannot substitute a mock registry without patching the module.

♻️ Proposed refactor — thread the registry through
 async def generate_response(
     generator: AsyncIterator[str],
     context: ResponseGeneratorContext,
     responses_params: ResponsesApiParams,
     turn_summary: TurnSummary,
     request_id: str,
+    registry: StreamInterruptRegistry | None = None,
 ) -> AsyncIterator[str]:
     ...
+    if registry is None:
+        from utils.stream_interrupts import get_stream_interrupt_registry
+        registry = get_stream_interrupt_registry()
     current_task = asyncio.current_task()
     if current_task is not None:
-        stream_interrupt_registry.register_stream(...)
+        registry.register_stream(...)
     ...
     finally:
-        stream_interrupt_registry.deregister_stream(request_id)
+        registry.deregister_stream(request_id)

Then pass registry from the endpoint, obtained via Depends(get_stream_interrupt_registry).

Also applies to: 321-326, 347-355, 390-391

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/streaming_query.py` around lines 87 - 88, generate_response
currently uses the module-level stream_interrupt_registry singleton (imported
from utils.stream_interrupts), which prevents test overrides; change
generate_response to accept a registry parameter (e.g., registry) and stop
referencing the module-level stream_interrupt_registry, then update the
streaming endpoint to inject the registry via
Depends(get_stream_interrupt_registry) and pass it into generate_response. Also
propagate this parameter through other call sites that use the singleton in this
file (the other invocations noted around the earlier blocks) so they call
generate_response (or the helper that invokes it) with the injected registry
instead of the module singleton.
tests/integration/test_openapi_json.py (1)

220-231: Consider adding 422 for consistency with other POST endpoints in this suite.
Both /v1/query and /v1/streaming_query include 422 in expected codes; keeping the interrupt endpoint consistent improves coverage expectations.

🔧 Suggested update
-            {"200", "401", "403", "404"},
+            {"200", "401", "403", "404", "422"},
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_openapi_json.py` around lines 220 - 231, The
"/v1/streaming_query/interrupt" POST expected status set is missing 422; update
the test expectations by adding "422" to the status code set for the route tuple
(" /v1/streaming_query/interrupt", "post", {...}) so it matches the other POST
endpoints (e.g., "/v1/query" and "/v1/streaming_query") and maintains consistent
coverage.
src/app/endpoints/stream_interrupt.py (3)

5-5: Add status to the FastAPI import per guidelines.

Request and status are prescribed in the import template. While Request is not used here, status should be imported and used explicitly for the status_code on @router.post.

🔧 Proposed fix
-from fastapi import APIRouter, Depends, HTTPException
+from fastapi import APIRouter, Depends, HTTPException, status

Then on the @router.post decorator:

 `@router.post`(
     "/streaming_query/interrupt",
+    status_code=status.HTTP_200_OK,
     responses=stream_interrupt_responses,
     summary="Streaming Query Interrupt Endpoint Handler",
 )

As per coding guidelines: "Import FastAPI components using: from fastapi import APIRouter, HTTPException, Request, status, Depends".

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/stream_interrupt.py` at line 5, Import the FastAPI status
helper and use it as the explicit status_code in the route decorator: update the
import line to include status (from fastapi import APIRouter, Depends,
HTTPException, status) and modify the `@router.post` decorator in this module (the
APIRouter route for stream interruption) to include
status_code=status.HTTP_200_OK (or the appropriate status constant) instead of a
raw integer or implicit default.

48-48: Expand the docstring to follow Google Python conventions.

The one-liner is missing the mandatory Parameters, Returns, and Raises sections.

📝 Proposed fix
-    """Interrupt an in-progress streaming query by request identifier."""
+    """Interrupt an in-progress streaming query by request identifier.
+
+    Parameters:
+        interrupt_request (StreamingInterruptRequest): Payload containing the
+            request_id of the stream to cancel.
+        auth (AuthTuple): Resolved authentication tuple; the first element is
+            used as the owning user_id.
+        registry (StreamInterruptRegistry): Active stream registry used to
+            cancel the targeted task.
+
+    Returns:
+        StreamingInterruptResponse: Confirmation payload with interrupted=True.
+
+    Raises:
+        HTTPException: 404 when no active stream matching the request_id and
+            user_id is found (stream absent, already complete, or owned by
+            another user).
+    """

As per coding guidelines: "All functions must have complete docstrings with brief descriptions" and "Follow Google Python docstring conventions for modules, classes, and functions with Parameters, Returns, Raises, and Attributes sections."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/stream_interrupt.py` at line 48, The one-line docstring in
stream_interrupt.py should be expanded to a full Google-style docstring for the
endpoint function (e.g., interrupt_stream or the function that "Interrupt an
in-progress streaming query by request identifier")—add a brief description plus
a Parameters section describing the request identifier arg (type and meaning), a
Returns section describing the response (type and structure), and a Raises
section listing possible exceptions (e.g., ValueError/KeyError or HTTPError) and
when they are raised; place this docstring immediately above the endpoint
handler function definition and follow Google Python conventions for formatting.

3-22: Add a module-level logger — required by guidelines and needed for observability.

The file has no logger = get_logger(__name__) and the handler emits no log entries. The registry already logs the wrong-user warning, but the endpoint layer logs nothing: successful cancellations and all 404 paths are entirely silent in endpoint traces.

🔧 Proposed fix
+from log import get_logger
 from models.config import Action
 from models.requests import StreamingInterruptRequest
 ...

+logger = get_logger(__name__)

 router = APIRouter(tags=["streaming_query_interrupt"])

Then inside the handler:

     interrupted = registry.cancel_stream(request_id, user_id)
     if not interrupted:
+        logger.debug("Stream interrupt not fulfilled for request_id=%s", request_id)
         response = NotFoundResponse(
             resource="streaming request",
             resource_id=request_id,
         )
         raise HTTPException(**response.model_dump())
+    logger.info("Stream interrupted: request_id=%s user_id=%s", request_id, user_id)
     return StreamingInterruptResponse(...)

As per coding guidelines: "Use logger = get_logger(__name__) from log.py for module logging" and "Use logger.debug() for detailed diagnostic information, logger.info() for general execution info."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/stream_interrupt.py` around lines 3 - 22, Add a
module-level logger by importing get_logger from log.py and defining logger =
get_logger(__name__) at top of src/app/endpoints/stream_interrupt.py, then
update the request handler that uses StreamingInterruptRequest,
StreamInterruptRegistry/get_stream_interrupt_registry to emit logs: use
logger.info() when a stream interruption is successfully cancelled (include
stream id and user from the request), and use logger.debug() or logger.info()
when returning a 404 path (e.g., interrupt not found) to record the attempted
stream id and caller; keep existing registry warnings intact. Ensure you use the
module-level logger variable (logger) rather than print or other loggers.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 349-355: When asyncio.current_task() returns None the stream isn't
registered and interrupt requests silently fail; update the block around
current_task and stream_interrupt_registry.register_stream to log a warning when
current_task is None (include request_id and user_id in the message) so failures
are diagnosable, i.e., call logger.warning(...) describing that no current task
was found for the given request_id/user_id and that interrupt registration was
skipped.
- Around line 387-394: The except asyncio.CancelledError handler currently
swallows cancellation and only logs the event; update it to call task.uncancel()
on the current task and then re-raise the CancelledError after running cleanup
so asyncio cancellation semantics are preserved. Specifically, in the except
block around the streaming loop that logs "Streaming request %s interrupted by
user" and yields stream_interrupted_event(request_id), call
asyncio.current_task().uncancel() (or the appropriate task reference) before
performing any non-exceptional cleanup, ensure
stream_interrupt_registry.deregister_stream(request_id) still runs in finally,
and then re-raise the CancelledError so that outer timeouts/TaskGroup
cancellation behave correctly while keeping the existing if not
stream_completed: return behavior to skip post-stream side effects.

In `@src/models/requests.py`:
- Around line 270-293: Update the docstrings for the StreamingInterruptRequest
class and its validator check_request_id to follow Google style: add an
"Attributes" section to the class docstring describing request_id (type: str,
description: the active streaming request ID) and include example format; add a
full docstring for check_request_id that contains "Parameters" (value: str — the
request ID to validate), "Returns" (str — the validated request ID), and
"Raises" (ValueError — if suid.check_suid(value) returns False), referencing the
validator's call to suid.check_suid to validate format; ensure wording is brief
and consistent with existing project docstring style.

In `@src/utils/stream_interrupts.py`:
- Line 7: Replace all legacy Optional[ActiveStream] annotations with the modern
union syntax ActiveStream | None; specifically update the import/annotation at
the top that currently reads "from typing import Optional" and any function,
variable, or return type annotations referencing Optional[ActiveStream] (e.g.,
the ActiveStream-typed parameters/returns around the ActiveStream class and
usages on lines referenced) to use "ActiveStream | None" instead; ensure any
other typing hints in the same file (notably the occurrences mentioned around
lines 66-69) are similarly converted and remove the now-unused Optional import
if no longer required.
- Around line 4-9: Replace the stdlib logger instantiation with the project's
centralized logger: remove the `import logging` and `logger =
logging.getLogger(__name__)` usage and instead call `get_logger(__name__)` from
`log.py`; update the top of the module to import `get_logger` and set `logger =
get_logger(__name__)` (this affects the module-level `logger` symbol in this
file so any existing references remain unchanged).

---

Outside diff comments:
In `@docs/openapi.json`:
- Around line 4414-4440: The OpenAPI spec currently reuses the same operationId
"handle_a2a_jsonrpc_a2a_get" for both the GET and POST A2A endpoints; change the
POST operationId to a unique identifier (for example
"handle_a2a_jsonrpc_a2a_post") so GET and POST have distinct operationIds and
client generation won't fail—update the "operationId" field under the POST
operation accordingly.

In `@src/app/endpoints/streaming_query.py`:
- Around line 828-829: The docstring for shield_violation_generator is stale: it
claims the generator yields start, token, and end events but the implementation
yields only a single token event; update the docstring in the
shield_violation_generator function to reflect the current behavior (it yields a
single token/event representing the violation, not start or end events) and
mention that generate_response now emits the start and end events; reference the
function name shield_violation_generator and update wording to describe the
single token payload and any fields it contains (e.g., token content and
reason).

In `@tests/unit/app/test_routers.py`:
- Around line 133-168: The test_check_prefixes docstring is out of date (it says
16 routers) — update the docstring in the test_check_prefixes function to
reflect the current assertions (20 routers) and, if present, adjust any
explanatory text about which routers have which prefixes so it matches the
actual checks performed for include_routers against the MockFastAPI instance
(references: test_check_prefixes, include_routers, MockFastAPI, and the router
symbols like conversations_v2.router/conversations_v1.router).

---

Duplicate comments:
In `@tests/integration/test_openapi_json.py`:
- Around line 307-318: The HTTP response code sets for the OpenAPI path entries
(e.g., "/v1/query", "/v1/streaming_query", "/v1/streaming_query/interrupt",
"/v1/config") are inconsistent with the rest of the test; normalize them to
match the project's canonical format (same ordering and representation for
status codes) — e.g., use a consistently ordered tuple/list or a sorted set
literal across all entries or the same formatting used elsewhere in
test_openapi_json.py so these four lines follow the same style as the other path
entries.

---

Nitpick comments:
In `@src/app/endpoints/stream_interrupt.py`:
- Line 5: Import the FastAPI status helper and use it as the explicit
status_code in the route decorator: update the import line to include status
(from fastapi import APIRouter, Depends, HTTPException, status) and modify the
`@router.post` decorator in this module (the APIRouter route for stream
interruption) to include status_code=status.HTTP_200_OK (or the appropriate
status constant) instead of a raw integer or implicit default.
- Line 48: The one-line docstring in stream_interrupt.py should be expanded to a
full Google-style docstring for the endpoint function (e.g., interrupt_stream or
the function that "Interrupt an in-progress streaming query by request
identifier")—add a brief description plus a Parameters section describing the
request identifier arg (type and meaning), a Returns section describing the
response (type and structure), and a Raises section listing possible exceptions
(e.g., ValueError/KeyError or HTTPError) and when they are raised; place this
docstring immediately above the endpoint handler function definition and follow
Google Python conventions for formatting.
- Around line 3-22: Add a module-level logger by importing get_logger from
log.py and defining logger = get_logger(__name__) at top of
src/app/endpoints/stream_interrupt.py, then update the request handler that uses
StreamingInterruptRequest, StreamInterruptRegistry/get_stream_interrupt_registry
to emit logs: use logger.info() when a stream interruption is successfully
cancelled (include stream id and user from the request), and use logger.debug()
or logger.info() when returning a 404 path (e.g., interrupt not found) to record
the attempted stream id and caller; keep existing registry warnings intact.
Ensure you use the module-level logger variable (logger) rather than print or
other loggers.

In `@src/app/endpoints/streaming_query.py`:
- Around line 87-88: generate_response currently uses the module-level
stream_interrupt_registry singleton (imported from utils.stream_interrupts),
which prevents test overrides; change generate_response to accept a registry
parameter (e.g., registry) and stop referencing the module-level
stream_interrupt_registry, then update the streaming endpoint to inject the
registry via Depends(get_stream_interrupt_registry) and pass it into
generate_response. Also propagate this parameter through other call sites that
use the singleton in this file (the other invocations noted around the earlier
blocks) so they call generate_response (or the helper that invokes it) with the
injected registry instead of the module singleton.

In `@src/utils/stream_interrupts.py`:
- Line 17: The task type annotations use the bare generic asyncio.Task; update
them to asyncio.Task[Any] and add Any to imports (from typing import Any) so
type checks are complete; specifically change the parameter/type annotations for
the task variable and the register_stream function signature in
stream_interrupts.py from asyncio.Task to asyncio.Task[Any], and ensure the
top-level imports include Any so the code compiles with the updated annotations.
- Around line 28-69: Add Google-style docstring parameter sections to the four
public methods in this class: register_stream, cancel_stream, deregister_stream,
and get_stream. For each method include an "Args:" block that documents each
parameter (request_id: str, user_id: str where present, task: asyncio.Task where
present) and for cancel_stream and get_stream also add a "Returns:" block
describing the return value (bool for cancel_stream, Optional[ActiveStream] for
get_stream). Keep wording brief and follow the existing one-line summary then
the Args/Returns headers per Google Python docstring conventions.

In `@tests/integration/test_openapi_json.py`:
- Around line 220-231: The "/v1/streaming_query/interrupt" POST expected status
set is missing 422; update the test expectations by adding "422" to the status
code set for the route tuple (" /v1/streaming_query/interrupt", "post", {...})
so it matches the other POST endpoints (e.g., "/v1/query" and
"/v1/streaming_query") and maintains consistent coverage.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/app/endpoints/stream_interrupt.py (1)

1-21: Add module-level logger per coding guidelines.

The module has no logger defined. The coding guidelines require logger = get_logger(__name__) for all source modules. Since a success path (e.g., confirming a successful interrupt) and the @authorize dependency both benefit from audit-level logging, this is also useful for observability.

♻️ Proposed addition
 """Endpoint for interrupting in-progress streaming query requests."""
 
 from typing import Annotated, Any
 
 from fastapi import APIRouter, Depends, HTTPException
 
 from authentication import get_auth_dependency
 from authentication.interface import AuthTuple
 from authorization.middleware import authorize
+from log import get_logger
 from models.config import Action
 ...
+logger = get_logger(__name__)

As per coding guidelines: "Use logger = get_logger(__name__) from log.py for module logging."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/endpoints/stream_interrupt.py` around lines 1 - 21, Add a
module-level logger by importing get_logger and defining logger =
get_logger(__name__) at top of this module (so it’s available to the APIRouter
handlers, the authorize dependency usage, and functions interacting with
StreamInterruptRegistry / get_stream_interrupt_registry); then replace or
augment existing audit/success logging sites (e.g., where a
StreamingInterruptResponse is returned or where `@authorize` triggers) to use this
logger for audit-level messages. Ensure the import comes from the same logging
utility used elsewhere (get_logger) and that the logger variable name is exactly
logger to match coding guidelines.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/app/endpoints/stream_interrupt.py`:
- Around line 25-32: The 404 example label "streaming request" used in
stream_interrupt_responses does not exist on NotFoundResponse, so OpenAPI will
show no examples; fix by either changing the examples argument in
stream_interrupt_responses to an existing label such as "conversation" (i.e.,
replace "streaming request" with "conversation") or add a new labeled example
"streaming request" to
NotFoundResponse.model_config["json_schema_extra"]["examples"] (update the
NotFoundResponse class to include that key/value) so the
NotFoundResponse.openapi_response(examples=[...]) call can find it.

In `@src/utils/stream_interrupts.py`:
- Around line 3-7: The import block violates import ordering and includes an
unused symbol: move all standard-library imports (asyncio,
dataclasses.dataclass, threading.Lock, and typing if used) before the
first-party get_logger import and remove the unused typing.Any import entirely;
specifically, reorder so dataclass and Lock imports come before get_logger (or
import Lock via threading) and delete the "from typing import Any" line to
resolve the C0411 and F401/W0611 linter errors in this module (look for
get_logger, dataclass, and Lock to locate the relevant imports).

---

Duplicate comments:
In `@src/app/endpoints/streaming_query.py`:
- Around line 392-396: The except block currently swallows
asyncio.CancelledError; modify it so after yielding
stream_interrupted_event(request_id) and after finally calling
stream_interrupt_registry.deregister_stream(request_id) you re-raise the
CancelledError to preserve asyncio semantics and outer cancel propagation, and
before re-raising call asyncio.current_task().uncancel() (if available) to clear
the internal cancellation counter; also change the logger message in the except
from "interrupted by user" to a neutral "interrupted" (or include a source hint
if available) so it doesn't misleadingly imply user-initiated interrupts. Ensure
you update the except handler that references request_id,
stream_interrupted_event, and stream_interrupt_registry.deregister_stream
accordingly.

---

Nitpick comments:
In `@src/app/endpoints/stream_interrupt.py`:
- Around line 1-21: Add a module-level logger by importing get_logger and
defining logger = get_logger(__name__) at top of this module (so it’s available
to the APIRouter handlers, the authorize dependency usage, and functions
interacting with StreamInterruptRegistry / get_stream_interrupt_registry); then
replace or augment existing audit/success logging sites (e.g., where a
StreamingInterruptResponse is returned or where `@authorize` triggers) to use this
logger for audit-level messages. Ensure the import comes from the same logging
utility used elsewhere (get_logger) and that the logger variable name is exactly
logger to match coding guidelines.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (2)
src/utils/stream_interrupts.py (2)

59-73: Consider asyncio.Lock over threading.Lock for async-idiomatic behaviour.

The threading.Lock is acquired for the duration of the task.done() check and task.cancel() call (lines 59-73). Because task.cancel() itself only schedules a CancelledError on the event loop and returns immediately, the lock hold-time is negligible and blocking the loop is not a practical concern. However, using asyncio.Lock (and making the method async) would:

  • be idiomatic for code living in an async service;
  • avoid the subtle footgun of calling a blocking primitive from a coroutine if the critical section ever grows.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/stream_interrupts.py` around lines 59 - 73, The current code uses a
threading.Lock (self._lock) inside the method that checks stream.task.done() and
calls stream.task.cancel(); switch to an asyncio.Lock to be async-idiomatic:
initialize self._lock = asyncio.Lock() in the class, change the method to async
and replace the blocking with "async with self._lock:" around the critical
section (the stream lookup, user_id check, task.done() and task.cancel() calls),
and update all callers to await this method (or add a thin sync wrapper that
schedules it) so the lock is acquired without blocking the event loop.

97-106: In-memory singleton is isolated per process — multi-worker deployments will silently miss interrupts.

stream_interrupt_registry is a module-level object: each OS process running the application owns an independent copy. Under a --workers N uvicorn/gunicorn configuration, a POST /v1/streaming_query/interrupt request may be dispatched to a worker that did not register the target stream, causing a spurious 404.

Consider documenting this constraint prominently or, for production scale-out, replacing the in-memory store with a shared backend (Redis, a database table, or a shared-memory mechanism) so interrupt requests can reach the owning worker regardless of routing.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/stream_interrupts.py` around lines 97 - 106, The module-level
in-memory singleton stream_interrupt_registry (returned by
get_stream_interrupt_registry and typed as StreamInterruptRegistry) will be
isolated per OS process causing missed interrupts in multi-worker deployments;
fix by making the registry pluggable: add a factory (e.g.,
create_stream_interrupt_registry) that reads configuration/ENV and returns
either the current in-memory StreamInterruptRegistry or a
RedisBackedStreamInterruptRegistry/DB-backed implementation, replace the
module-level instantiation with a call to that factory, and update
get_stream_interrupt_registry to return the factory-created instance; if you
prefer not to implement a shared backend now, add a prominent module-level
docstring and README note explaining the multi-worker limitation and
recommending Redis/DB for production.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@src/utils/stream_interrupts.py`:
- Around line 20-21: The Task field is using a bare asyncio.Task which triggers
generic-type warnings; update the annotation for the streaming task to
asyncio.Task[None] in the relevant dataclass/structure (the task field in
src/utils/stream_interrupts.py) so the generic parameter is explicit—i.e.,
change the type of task to asyncio.Task[None] wherever it's declared (and adjust
imports if needed).
- Around line 45-73: The cancel_stream method conflates three distinct failure
modes into a single False return, so update cancel_stream to return a granular
result (an enum or typed tuple) instead of bool—e.g., CancelResult = Enum
{NOT_FOUND, FORBIDDEN, ALREADY_DONE, CANCELLED}—and adjust logic in
cancel_stream (use self._lock, self._streams.get(request_id), compare
stream.user_id, check stream.task.done(), call stream.task.cancel()) to return
the appropriate CancelResult in each branch; then update the caller in the
stream interrupt endpoint to map CancelResult.NOT_FOUND -> 404, .FORBIDDEN ->
404 (if you keep that security choice) or 403, .ALREADY_DONE -> 409, and
.CANCELLED -> 200/202 so the HTTP status codes reflect the precise outcome.

---

Nitpick comments:
In `@src/utils/stream_interrupts.py`:
- Around line 59-73: The current code uses a threading.Lock (self._lock) inside
the method that checks stream.task.done() and calls stream.task.cancel(); switch
to an asyncio.Lock to be async-idiomatic: initialize self._lock = asyncio.Lock()
in the class, change the method to async and replace the blocking with "async
with self._lock:" around the critical section (the stream lookup, user_id check,
task.done() and task.cancel() calls), and update all callers to await this
method (or add a thin sync wrapper that schedules it) so the lock is acquired
without blocking the event loop.
- Around line 97-106: The module-level in-memory singleton
stream_interrupt_registry (returned by get_stream_interrupt_registry and typed
as StreamInterruptRegistry) will be isolated per OS process causing missed
interrupts in multi-worker deployments; fix by making the registry pluggable:
add a factory (e.g., create_stream_interrupt_registry) that reads
configuration/ENV and returns either the current in-memory
StreamInterruptRegistry or a RedisBackedStreamInterruptRegistry/DB-backed
implementation, replace the module-level instantiation with a call to that
factory, and update get_stream_interrupt_registry to return the factory-created
instance; if you prefer not to implement a shared backend now, add a prominent
module-level docstring and README note explaining the multi-worker limitation
and recommending Redis/DB for production.

Comment on lines +45 to +73
def cancel_stream(self, request_id: str, user_id: str) -> bool:
"""Cancel an active stream owned by user.

The entire lookup-check-cancel sequence is performed under the
lock so that a concurrent ``deregister_stream`` cannot remove
the entry between the ownership check and the cancel call.

Parameters:
request_id: Unique streaming request identifier.
user_id: User identifier attempting the interruption.

Returns:
bool: True when cancellation was requested, otherwise False.
"""
with self._lock:
stream = self._streams.get(request_id)
if stream is None:
return False
if stream.user_id != user_id:
logger.warning(
"User %s attempted to interrupt request %s owned by another user",
user_id,
request_id,
)
return False
if stream.task.done():
return False
stream.task.cancel()
return True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

rg -n "cancel_stream\|cancel_result\|404\|403\|409\|HTTPException" --type=py -C 5

Repository: lightspeed-core/lightspeed-stack

Length of output: 58


🏁 Script executed:

fd -type f -name "*.py" | head -20

Repository: lightspeed-core/lightspeed-stack

Length of output: 246


🏁 Script executed:

find . -name "stream_interrupts.py" -o -name "*interrupt*" -type f | head -20

Repository: lightspeed-core/lightspeed-stack

Length of output: 267


🏁 Script executed:

rg "cancel_stream" --type=py -l

Repository: lightspeed-core/lightspeed-stack

Length of output: 144


🏁 Script executed:

cat -n src/app/endpoints/stream_interrupt.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 3163


🏁 Script executed:

cat -n src/utils/stream_interrupts.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 4228


The cancel_stream method returns a single bool that prevents the endpoint from returning semantically correct HTTP status codes.

All three failure modes map to the same False return value, causing the endpoint to return 404 for all cases (see src/app/endpoints/stream_interrupt.py lines 65-69). This is problematic for case 3: when the task is already done, the endpoint returns 404 Not Found—but the stream was found and is owned by the user; only its execution has completed. This should return 409 Conflict or similar, not 404.

Case 2 (unauthorized user → 404) may be intentional for security, but case 3's semantics are definitively incorrect.

Consider refactoring cancel_stream to return a more granular result (e.g., an enum or tuple) so the endpoint can emit the appropriate HTTP status for each scenario.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/stream_interrupts.py` around lines 45 - 73, The cancel_stream
method conflates three distinct failure modes into a single False return, so
update cancel_stream to return a granular result (an enum or typed tuple)
instead of bool—e.g., CancelResult = Enum {NOT_FOUND, FORBIDDEN, ALREADY_DONE,
CANCELLED}—and adjust logic in cancel_stream (use self._lock,
self._streams.get(request_id), compare stream.user_id, check stream.task.done(),
call stream.task.cancel()) to return the appropriate CancelResult in each
branch; then update the caller in the stream interrupt endpoint to map
CancelResult.NOT_FOUND -> 404, .FORBIDDEN -> 404 (if you keep that security
choice) or 403, .ALREADY_DONE -> 409, and .CANCELLED -> 200/202 so the HTTP
status codes reflect the precise outcome.

Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
Signed-off-by: Jordan Dubrick <jdubrick@redhat.com>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
docs/openapi.json (1)

4434-4460: ⚠️ Potential issue | 🟠 Major

Fix duplicate operationId for /a2a POST.

Line 4458 repeats the GET operationId, which makes operationIds non-unique and can break client code generation.

🔧 Proposed fix
-                "operationId": "handle_a2a_jsonrpc_a2a_get",
+                "operationId": "handle_a2a_jsonrpc_a2a_post",
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/openapi.json` around lines 4434 - 4460, The POST entry under the /a2a
endpoint has a duplicated operationId ("handle_a2a_jsonrpc_a2a_get") which must
be unique; update the POST operationId to a distinct value (for example
"handle_a2a_jsonrpc_a2a_post") in the OpenAPI JSON so GET and POST use different
operationIds and client code generation won't break—modify the "post" ->
"operationId" field accordingly.
🧹 Nitpick comments (3)
src/utils/stream_interrupts.py (1)

33-33: asyncio.Task parameter is missing its generic type argument.

register_stream's task parameter uses bare asyncio.Task while the ActiveStream.task field correctly uses asyncio.Task[None]. Strict type-checkers will flag the parameter annotation.

🔧 Proposed fix
-    def register_stream(
-        self, request_id: str, user_id: str, task: asyncio.Task
-    ) -> None:
+    def register_stream(
+        self, request_id: str, user_id: str, task: asyncio.Task[None]
+    ) -> None:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/utils/stream_interrupts.py` at line 33, The register_stream parameter
annotation uses a bare asyncio.Task which is inconsistent with
ActiveStream.task; update the register_stream signature to annotate task as
asyncio.Task[None] (matching ActiveStream.task) so strict type-checkers won't
complain; locate the register_stream definition and replace the bare
asyncio.Task type with asyncio.Task[None].
tests/unit/app/endpoints/test_streaming_query.py (1)

1360-1368: mock_context is missing vector_store_ids and rag_id_mapping compared to all other generate_response tests.

Every other test in TestGenerateResponse sets:

mock_context.vector_store_ids = []
mock_context.rag_id_mapping = {}

Because mocker.Mock(spec=ResponseGeneratorContext) auto-creates Mock objects for missing attributes, access to these fields won't raise AttributeError, but may behave silently differently from [] / {} if generate_response iterates them during the cancellation path.

🛠 Proposed fix
 mock_context = mocker.Mock(spec=ResponseGeneratorContext)
 mock_context.conversation_id = "conv_123"
 mock_context.user_id = "user_123"
+mock_context.vector_store_ids = []
+mock_context.rag_id_mapping = {}
 mock_context.query_request = QueryRequest(
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/endpoints/test_streaming_query.py` around lines 1360 - 1368,
The mock_context in this test lacks explicit vector_store_ids and rag_id_mapping
which other TestGenerateResponse cases set; update the test to assign
mock_context.vector_store_ids = [] and mock_context.rag_id_mapping = {} so the
mocked ResponseGeneratorContext behaves like real instances when
generate_response iterates or checks those fields (locate the mock_context
created with mocker.Mock(spec=ResponseGeneratorContext) in this test and add the
two assignments).
tests/unit/app/endpoints/test_stream_interrupt.py (1)

70-103: Consider adding an edge-case test for interrupting an already-completed task.

task.cancel() returns False when the task has already finished. If the endpoint handler calls cancel() on a done task without checking the return value, it could return interrupted=True even though no actual cancellation occurred — a misleading response to the caller. There is currently no test covering this race condition.

✅ Suggested additional test
`@pytest.mark.asyncio`
async def test_stream_interrupt_endpoint_already_completed(
    registry: StreamInterruptRegistry,
) -> None:
    """Interrupt endpoint handles an already-completed task gracefully."""
    request_id = "123e4567-e89b-12d3-a456-426614174003"
    user_id = "00000001-0001-0001-0001-000000000001"

    async def instant_stream() -> None:
        pass  # completes immediately

    task = asyncio.create_task(instant_stream())
    await task  # let the task finish naturally
    registry.register_stream(request_id, user_id, task)

    response = await stream_interrupt_endpoint_handler(
        interrupt_request=StreamingInterruptRequest(request_id=request_id),
        auth=(user_id, "mock_username", False, "mock_token"),
        registry=registry,
    )

    # Verify the response accurately reflects that cancel() had no effect
    assert isinstance(response, StreamingInterruptResponse)
    assert response.request_id == request_id
    # interrupted should be False (or the handler should document its semantics here)
    assert response.interrupted is False

Do you want me to open a new issue to track this test case?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/endpoints/test_stream_interrupt.py` around lines 70 - 103, Add
a unit test to cover the race where a stream task has already completed before
interruption: create an async test named
test_stream_interrupt_endpoint_already_completed that schedules an
instant-completing coroutine, awaits it so the Task is done, registers it with
StreamInterruptRegistry via register_stream(request_id, user_id, task), then
calls stream_interrupt_endpoint_handler with a StreamingInterruptRequest and the
correct auth tuple and assert the returned StreamingInterruptResponse has the
same request_id and that interrupted is False (verifying the handler does not
report a successful cancel when task.cancel() had no effect).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/integration/test_openapi_json.py`:
- Around line 226-230: The test's expected response codes for the
"/v1/streaming_query/interrupt" POST endpoint omit "422", but FastAPI adds a 422
for endpoints with a Pydantic body (StreamingInterruptRequest) and
_check_paths_and_responses_exist performs subset checks; update the tuple for
"/v1/streaming_query/interrupt" to include "422" in its set of expected
responses and apply the same change to the other URL-based parametrization entry
for the interrupt endpoint so both parametrized lists include "422".

In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Line 54: The tests import and use the global singleton
stream_interrupt_registry which creates fragile inter-test coupling; update
tests in test_streaming_query.py to either mock stream_interrupt_registry with
pytest-mock for each test or add a fixture that resets/clears the registry
before/after each test so generate_response's registration/deregistration
(including its finally cleanup) cannot leak between tests; reference
stream_interrupt_registry, generate_response, and the shared request_id value
when implementing the mock/fixture so each async test gets an isolated or
cleaned registry state.

---

Outside diff comments:
In `@docs/openapi.json`:
- Around line 4434-4460: The POST entry under the /a2a endpoint has a duplicated
operationId ("handle_a2a_jsonrpc_a2a_get") which must be unique; update the POST
operationId to a distinct value (for example "handle_a2a_jsonrpc_a2a_post") in
the OpenAPI JSON so GET and POST use different operationIds and client code
generation won't break—modify the "post" -> "operationId" field accordingly.

---

Duplicate comments:
In `@src/app/endpoints/stream_interrupt.py`:
- Around line 25-32: The 404 example label "streaming request" used in
stream_interrupt_responses is not defined in NotFoundResponse, so OpenAPI drops
the examples; either change the label in stream_interrupt_responses to one of
the existing NotFoundResponse example keys (e.g., "conversation", "provider",
"model", or "rag") when calling NotFoundResponse.openapi_response(...) or add a
new "streaming request" entry into NotFoundResponse's
model_config["json_schema_extra"]["examples"] (update the NotFoundResponse class
definition to include that key and an appropriate example payload) so
NotFoundResponse.openapi_response(examples=["streaming request"]) will render
correctly.

In `@src/app/endpoints/streaming_query.py`:
- Around line 392-399: The CancelledError handler in the except block for
asyncio.CancelledError (around the use of request_id, stream_interrupted_event,
and stream_interrupt_registry) swallows cancellations without clearing the
task's cancelled state; update the handler to (1) log a neutral message like
"Streaming request %s cancelled" instead of "interrupted by user", and (2) after
yielding stream_interrupted_event(request_id) call uncancel on the current task
to clear the cancellation flag (use asyncio.current_task().uncancel() when
available—guard for older Python versions or feature presence). Ensure
stream_interrupt_registry.deregister_stream(request_id) still runs in finally
and preserve behavior around stream_completed.

In `@src/utils/stream_interrupts.py`:
- Around line 45-73: Change cancel_stream to return a richer result enum instead
of a plain bool so callers can distinguish the three failure modes; introduce a
CancelResult enum (e.g., NOT_FOUND, FORBIDDEN, ALREADY_DONE, CANCELLED) and
update the cancel_stream signature and implementation to return the appropriate
enum value for each branch (stream missing -> NOT_FOUND, wrong owner ->
FORBIDDEN, task.done() -> ALREADY_DONE, task.cancel() -> CANCELLED); update any
callers (HTTP endpoint) to map CancelResult to the correct HTTP status (404,
403, 409, 200/202) accordingly.

---

Nitpick comments:
In `@src/utils/stream_interrupts.py`:
- Line 33: The register_stream parameter annotation uses a bare asyncio.Task
which is inconsistent with ActiveStream.task; update the register_stream
signature to annotate task as asyncio.Task[None] (matching ActiveStream.task) so
strict type-checkers won't complain; locate the register_stream definition and
replace the bare asyncio.Task type with asyncio.Task[None].

In `@tests/unit/app/endpoints/test_stream_interrupt.py`:
- Around line 70-103: Add a unit test to cover the race where a stream task has
already completed before interruption: create an async test named
test_stream_interrupt_endpoint_already_completed that schedules an
instant-completing coroutine, awaits it so the Task is done, registers it with
StreamInterruptRegistry via register_stream(request_id, user_id, task), then
calls stream_interrupt_endpoint_handler with a StreamingInterruptRequest and the
correct auth tuple and assert the returned StreamingInterruptResponse has the
same request_id and that interrupted is False (verifying the handler does not
report a successful cancel when task.cancel() had no effect).

In `@tests/unit/app/endpoints/test_streaming_query.py`:
- Around line 1360-1368: The mock_context in this test lacks explicit
vector_store_ids and rag_id_mapping which other TestGenerateResponse cases set;
update the test to assign mock_context.vector_store_ids = [] and
mock_context.rag_id_mapping = {} so the mocked ResponseGeneratorContext behaves
like real instances when generate_response iterates or checks those fields
(locate the mock_context created with mocker.Mock(spec=ResponseGeneratorContext)
in this test and add the two assignments).

Comment on lines +226 to +230
(
"/v1/streaming_query/interrupt",
"post",
{"200", "401", "403", "404"},
),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Missing "422" in expected response codes for the interrupt endpoint.

Every other POST endpoint with a Pydantic body (e.g. /v1/query, /v1/streaming_query) includes "422" in its expected set. FastAPI unconditionally generates a 422 Unprocessable Entity entry in the OpenAPI spec for any endpoint that accepts a validated request body (StreamingInterruptRequest). Because _check_paths_and_responses_exist performs a subset check, the test still passes, but the omission means the 422 response is never validated.

🛠 Proposed fix
-        (
-            "/v1/streaming_query/interrupt",
-            "post",
-            {"200", "401", "403", "404"},
-        ),
+        (
+            "/v1/streaming_query/interrupt",
+            "post",
+            {"200", "401", "403", "404", "422"},
+        ),

Apply the same change in the URL-based parametrize list (lines 313-317).

Also applies to: 313-317

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/integration/test_openapi_json.py` around lines 226 - 230, The test's
expected response codes for the "/v1/streaming_query/interrupt" POST endpoint
omit "422", but FastAPI adds a 422 for endpoints with a Pydantic body
(StreamingInterruptRequest) and _check_paths_and_responses_exist performs subset
checks; update the tuple for "/v1/streaming_query/interrupt" to include "422" in
its set of expected responses and apply the same change to the other URL-based
parametrization entry for the interrupt endpoint so both parametrized lists
include "422".

from models.requests import Attachment, QueryRequest
from models.responses import InternalServerErrorResponse
from utils.token_counter import TokenCounter
from utils.stream_interrupts import stream_interrupt_registry
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify how many tests in this file call generate_response with the same request_id,
# and whether stream_interrupt_registry is mocked in any of them.

echo "=== Tests using the shared request_id with generate_response ==="
rg -n '123e4567-e89b-12d3-a456-426614174000' tests/unit/app/endpoints/test_streaming_query.py

echo ""
echo "=== Any usage/mocking of stream_interrupt_registry in this file ==="
rg -n 'stream_interrupt_registry' tests/unit/app/endpoints/test_streaming_query.py

echo ""
echo "=== Setup/fixture definitions in the test file ==="
rg -n '@pytest.fixture|def setup|def teardown' tests/unit/app/endpoints/test_streaming_query.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1274


🏁 Script executed:

#!/bin/bash
# Find the streaming_query.py file to understand how generate_response interacts with stream_interrupt_registry
find . -name "streaming_query.py" -type f | grep -v __pycache__ | grep -v test

Repository: lightspeed-core/lightspeed-stack

Length of output: 114


🏁 Script executed:

#!/bin/bash
# Check if stream_interrupt_registry is reset/cleared in conftest or any fixture
find tests -name "conftest.py" | head -5 | xargs grep -l "stream_interrupt_registry" 2>/dev/null || echo "Not found in conftest files"

# Also check the actual stream_interrupts module
find . -path "*/utils/stream_interrupts*" -type f | grep -v __pycache__

Repository: lightspeed-core/lightspeed-stack

Length of output: 136


🏁 Script executed:

#!/bin/bash
# Check streaming_query.py for how generate_response uses stream_interrupt_registry
echo "=== Checking generate_response function ==="
rg -n "def generate_response" src/app/endpoints/streaming_query.py -A 30 | head -50

echo ""
echo "=== Checking stream_interrupt_registry usage in streaming_query.py ==="
rg -n "stream_interrupt_registry" src/app/endpoints/streaming_query.py -B 2 -A 2

Repository: lightspeed-core/lightspeed-stack

Length of output: 2082


🏁 Script executed:

#!/bin/bash
# Check stream_interrupts.py to understand the singleton behavior and registration
echo "=== Checking stream_interrupts.py structure ==="
cat -n src/utils/stream_interrupts.py | head -100

echo ""
echo "=== Check if there's a get_stream or deregister method ==="
rg -n "def (get_stream|deregister|register)" src/utils/stream_interrupts.py -A 3

Repository: lightspeed-core/lightspeed-stack

Length of output: 4800


🏁 Script executed:

#!/bin/bash
# Check if tests have any cleanup between test functions or fixtures
echo "=== Checking for teardown/cleanup in test file ==="
rg -n "yield|finally|cleanup|clear" tests/unit/app/endpoints/test_streaming_query.py | head -20

echo ""
echo "=== Check line 1382 and surrounding context (the cancellation test) ==="
sed -n '1370,1410p' tests/unit/app/endpoints/test_streaming_query.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2487


🏁 Script executed:

#!/bin/bash
# Check the context around the 6 tests using the same request_id to confirm they call generate_response
echo "=== Test 1 at line 1109 (context) ==="
sed -n '1100,1130p' tests/unit/app/endpoints/test_streaming_query.py

echo ""
echo "=== Test 2 at line 1163 (context) ==="
sed -n '1155,1185p' tests/unit/app/endpoints/test_streaming_query.py

echo ""
echo "=== Check how many test functions exist in this file ==="
rg -n "^    async def test_|^    def test_" tests/unit/app/endpoints/test_streaming_query.py | wc -l

echo ""
echo "=== List all test function names ==="
rg -n "^    async def test_|^    def test_" tests/unit/app/endpoints/test_streaming_query.py | head -30

Repository: lightspeed-core/lightspeed-stack

Length of output: 4279


🏁 Script executed:

#!/bin/bash
# Check if there's a conftest that might be resetting the registry or if generate_response is actually being called
echo "=== Checking for asyncio.CancelledError handling in the tests ==="
rg -n "CancelledError|asyncio.Task|current_task" tests/unit/app/endpoints/test_streaming_query.py -B 2 -A 2 | head -50

echo ""
echo "=== Check where line 1109 with request_id is actually used (broader context) ==="
sed -n '1090,1140p' tests/unit/app/endpoints/test_streaming_query.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 2426


🏁 Script executed:

#!/bin/bash
# Check if tests mock asyncio.current_task or if it would return None
echo "=== Checking asyncio.current_task behavior in tests ==="
rg -n "current_task|monkeypatch" tests/unit/app/endpoints/test_streaming_query.py | head -10

# Also check the test class to see if there's any event loop setup
sed -n '1050,1080p' tests/unit/app/endpoints/test_streaming_query.py

Repository: lightspeed-core/lightspeed-stack

Length of output: 1264


Remove global singleton dependency from unit test or mock the registry with automatic cleanup.

The test file imports stream_interrupt_registry directly (line 54), a shared module-level singleton. Six tests call generate_response with the same request_id ("123e4567-e89b-12d3-a456-426614174000"), and the cancellation test asserts that the registry is empty afterward. While generate_response properly deregisters via its finally block (line 396), the lack of explicit cleanup or mocking creates an implicit, fragile inter-test dependency.

Since tests run with @pytest.mark.asyncio, asyncio.current_task() returns a task object and registration into the singleton occurs. If the finally block fails to execute or cleanup is delayed, subsequent tests could observe stale registry entries. Use pytest-mock to mock stream_interrupt_registry or provide a fixture that resets it before each test to eliminate this coupling and improve test isolation.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/unit/app/endpoints/test_streaming_query.py` at line 54, The tests
import and use the global singleton stream_interrupt_registry which creates
fragile inter-test coupling; update tests in test_streaming_query.py to either
mock stream_interrupt_registry with pytest-mock for each test or add a fixture
that resets/clears the registry before/after each test so generate_response's
registration/deregistration (including its finally cleanup) cannot leak between
tests; reference stream_interrupt_registry, generate_response, and the shared
request_id value when implementing the mock/fixture so each async test gets an
isolated or cleaned registry state.

@Jdubrick
Copy link
Contributor Author

@tisnik PTAL

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant