Skip to content

Support for NATS dead letter queue#1175

Open
carlosgjs wants to merge 18 commits intoRolnickLab:mainfrom
uw-ssec:carlos/natsdlq
Open

Support for NATS dead letter queue#1175
carlosgjs wants to merge 18 commits intoRolnickLab:mainfrom
uw-ssec:carlos/natsdlq

Conversation

@carlosgjs
Copy link
Collaborator

@carlosgjs carlosgjs commented Mar 4, 2026

Summary

This pull request introduces dead letter queue (DLQ) support for job tasks, allowing for better tracking and cleanup of failed tasks in the NATS orchestration layer.

Helps address #1168

Dead Letter Queue Management

  • Added a new management command check_dead_letter_queue.py to allow users to check for dead letter queue messages for a specific job via the command line.
  • Implemented get_dead_letter_task_ids in TaskQueueManager to retrieve task IDs that failed after max delivery attempts, using a durable advisory consumer for persistence.
  • Added _setup_advisory_stream to ensure advisory streams are created when a connection is opened, capturing max-delivery events for DLQ tracking.

Reliability Improvements

  • Ensured that acknowledgements (ACKs) for tasks and advisory messages are flushed to the NATS socket to avoid silent drops when subscriptions are torn down.

How to Test the Changes

  • Temporarily: change the TASK_TTR in nats_queue.py to a small value (e.g. 10 seconds) for easier testing. Also change max_deliver=2.
  • Create an async job with a small number of images, don't run any workers
  • Using the code from PR #112, run python trapdata/antenna/benchmark.py --job-id <job id> --skip-acks. This will drain the tasks with ACKs, similar to a crashing worker. Wait ~10 seconds and run it again.
  • Run docker compose run --rm django python manage.py check_dead_letter_queue <job_id>

Example output:

INFO 2026-03-04 14:15:34,238 nats_queue 1 131574005512000 Advisory stream created
INFO 2026-03-04 14:15:34,241 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 1
INFO 2026-03-04 14:15:34,242 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 2
INFO 2026-03-04 14:15:34,242 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 3
INFO 2026-03-04 14:15:34,242 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 4
INFO 2026-03-04 14:15:34,243 nats_queue 1 131574005512000 Acknowledged advisory message for stream_seq 5
Found 5 dead letter task(s) for job 196:
  - Image ID: 28324
  - Image ID: 35223
  - Image ID: 22865
  - Image ID: 1528
  - Image ID: 3781

Once #1025 is in place, we can use this method to periodically check for failed images and then log to the job something like Workers have failed to process Image 123 5 times, giving up, maybe only up to a max number of errors.

Summary by CodeRabbit

  • New Features

    • Added a management command to retrieve and display dead letter queue messages for a specific job, enabling users to identify, monitor, and investigate failed task executions.
  • Tests

    • Updated test suite to cover dead letter queue advisory stream setup, consumer lifecycle management, and associated cleanup operations.

carlos-irreverentlabs and others added 17 commits January 16, 2026 11:25
* fix: prevent NATS connection flooding and stale job task fetching

- Add connect_timeout=5, allow_reconnect=False to NATS connections to
  prevent leaked reconnection loops from blocking Django's event loop
- Guard /tasks endpoint against terminal-status jobs (return empty tasks
  instead of attempting NATS reserve)
- IncompleteJobFilter now excludes jobs by top-level status in addition
  to progress JSON stages
- Add stale worker cleanup to integration test script

Found during PSv2 integration testing where stale ADC workers with
default DataLoader parallelism overwhelmed the single uvicorn worker
thread by flooding /tasks with concurrent NATS reserve requests.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: PSv2 integration test session notes and NATS flooding findings

Session notes from 2026-02-16 integration test including root cause
analysis of stale worker task competition and NATS connection issues.
Findings doc tracks applied fixes and remaining TODOs with priorities.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: update session notes with successful test run #3

PSv2 integration test passed end-to-end (job 1380, 20/20 images).
Identified ack_wait=300s as cause of ~5min idle time when GPU
processes race for NATS tasks.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: batch NATS task fetch to prevent HTTP timeouts

Replace N×1 reserve_task() calls with single reserve_tasks() batch
fetch. The previous implementation created a new pull subscription per
message (320 NATS round trips for batch=64), causing the /tasks endpoint
to exceed HTTP client timeouts. The new approach uses one psub.fetch()
call for the entire batch.

Co-Authored-By: Claude <noreply@anthropic.com>

* docs: add next session prompt

* feat: add pipeline__slug__in filter for multi-pipeline job queries

Workers that handle multiple pipelines can now fetch jobs for all of them
in a single request: ?pipeline__slug__in=slug1,slug2

Co-Authored-By: Claude <noreply@anthropic.com>

* chore: remove local-only docs and scripts from branch

These files are session notes, planning docs, and test scripts that
should stay local rather than be part of the PR.

Co-Authored-By: Claude <noreply@anthropic.com>

* feat: set job dispatch_mode at creation time based on project feature flags

ML jobs with a pipeline now get dispatch_mode set during setup() instead
of waiting until run() is called by the Celery worker. This lets the UI
show the correct mode immediately after job creation.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: add timeouts to all JetStream operations and restore reconnect policy

Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations
via asyncio.wait_for() so a hung NATS connection fails fast instead of
blocking the caller's thread indefinitely. Also restore the intended
reconnect policy (2 attempts, 1s wait) that was lost in a prior force push.

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: propagate NATS timeouts as 503 instead of swallowing them

asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was
caught by the broad `except Exception` in reserve_tasks(), silently
returning [] and making NATS outages indistinguishable from empty queues.
Workers would then poll immediately, recreating the flooding problem.

- Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks()
- Catch TimeoutError and OSError in the /tasks view, return 503
- Restore allow_reconnect=False (fail-fast on connection issues)
- Add return type annotation to get_connection()

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: address review comments (log level, fetch timeout, docstring)

- Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid
  log spam from frequent polling)
- Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker
  for 5s on empty queues
- Fix docstring examples using string 'job123' for int-typed job_id

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses

NoServersError, ConnectionClosedError, and other NATS exceptions inherit
from nats.errors.Error (not OSError), so they escaped the handler and
returned 500 instead of 503.

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
…olnickLab#1142)

* feat: configurable NATS tuning and gunicorn worker management

Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to
this branch:

- Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s)
- Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100)
- Local dev: switch to gunicorn+UvicornWorker by default for production
  parity, with USE_UVICORN=1 escape hatch for raw uvicorn
- Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8)
  when not explicitly set in the environment

Co-Authored-By: Claude <noreply@anthropic.com>

* fix: address PR review comments

- Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`)
- Update TaskQueueManager docstring with Args section
- Simplify production WEB_CONCURRENCY fallback (just use nproc)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Michael Bunsen <notbot@gmail.com>
Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response)

The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in
the response to know which pipeline to run. Without it, Pydantic
validation fails and the worker skips the job.

Co-Authored-By: Claude <noreply@anthropic.com>

* Update ami/jobs/serializers.py

Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
@netlify
Copy link

netlify bot commented Mar 4, 2026

👷 Deploy request for antenna-ssec pending review.

Visit the deploys page to approve it

Name Link
🔨 Latest commit db05526

@netlify
Copy link

netlify bot commented Mar 4, 2026

Deploy Preview for antenna-preview canceled.

Name Link
🔨 Latest commit db05526
🔍 Latest deploy log https://app.netlify.com/projects/antenna-preview/deploys/69ab4b2b848505000844a236

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Mar 4, 2026

📝 Walkthrough

Walkthrough

A new Django management command (check_dead_letter_queue) enables checking for failed tasks in a job's dead letter queue. The TaskQueueManager is enhanced with advisory stream creation, methods to retrieve and delete dead letter task IDs, and integrated DLQ cleanup into resource management. Tests are updated to validate the new advisory stream and consumer deletion behavior.

Changes

Cohort / File(s) Summary
New Management Command
ami/ml/management/commands/check_dead_letter_queue.py
New Django management command that accepts a job ID and uses TaskQueueManager to retrieve and display dead letter task IDs via async helper method.
NATS Queue Dead Letter Support
ami/ml/orchestration/nats_queue.py
Adds advisory stream initialization, get_dead_letter_task_ids() method to fetch failed task IDs, delete_dlq_consumer() for cleanup, helper _get_dlq_consumer_name(), and integrates DLQ consumer deletion into cleanup_job_resources().
Test Updates
ami/ml/orchestration/tests/test_nats_queue.py
Updates mocks and assertions to account for advisory stream creation during TaskQueueManager initialization and DLQ consumer deletion during cleanup.

Sequence Diagram

sequenceDiagram
    participant CLI as Management Command
    participant TQM as TaskQueueManager
    participant NATS as NATS Connection
    participant Stream as Advisory Stream

    CLI->>TQM: check_dead_letter_queue(job_id)
    TQM->>NATS: __aenter__() async context
    NATS->>Stream: _setup_advisory_stream()
    Stream-->>NATS: advisory stream created/validated
    NATS-->>TQM: context ready
    TQM->>NATS: get_dead_letter_task_ids(job_id)
    NATS->>Stream: read advisory events (max-delivery)
    Stream-->>NATS: retrieve failed task events
    NATS->>NATS: extract image_ids from messages
    NATS->>NATS: acknowledge processed advisories
    NATS->>NATS: flush connection
    NATS-->>TQM: list of failed task IDs
    TQM-->>CLI: display results (found/not found)
Loading

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~25 minutes

Possibly related PRs

Suggested reviewers

  • mihow

Poem

🐰 A rabbit hops through queues of dread,
Seeking letters marked as dead,
With advisory streams held high,
Lost tasks found—no more goodbye! 📬✨

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Title check ✅ Passed The title 'Support for NATS dead letter queue' clearly and directly summarizes the main change introduced in this pull request.
Description check ✅ Passed The pull request description covers the Summary, List of Changes, Related Issues, Detailed Description, How to Test the Changes sections with comprehensive information; deployment notes and checklist are absent but not critical.
Docstring Coverage ✅ Passed Docstring coverage is 81.25% which is sufficient. The required threshold is 80.00%.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@carlosgjs carlosgjs requested a review from mihow March 4, 2026 19:27
@carlosgjs carlosgjs changed the title Carlos/natsdlq Support for NATS dead letter queue Mar 6, 2026
@carlosgjs carlosgjs marked this pull request as ready for review March 6, 2026 21:57
Copilot AI review requested due to automatic review settings March 6, 2026 21:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Adds dead letter queue (DLQ) support to the NATS/JetStream-backed async job task queue by capturing max-delivery advisories, enabling operators to query failed tasks and improving ACK reliability during teardown.

Changes:

  • Create/use a shared JetStream “advisories” stream and a per-job durable advisory consumer to track max-delivery (DLQ) events.
  • Add TaskQueueManager.get_dead_letter_task_ids() and DLQ consumer cleanup hooks.
  • Introduce a Django management command to query DLQ entries for a job from the CLI.

Reviewed changes

Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.

File Description
ami/ml/orchestration/nats_queue.py Adds advisory stream setup on connect, DLQ query method, extra flushes, and DLQ consumer cleanup.
ami/ml/orchestration/tests/test_nats_queue.py Updates unit tests for new advisory stream creation and additional consumer deletion.
ami/ml/management/commands/check_dead_letter_queue.py Adds CLI command to print DLQ task/image IDs for a job.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines 451 to +479
@@ -342,8 +471,9 @@ async def cleanup_job_resources(self, job_id: int) -> bool:
Returns:
bool: True if successful, False otherwise
"""
# Delete consumer first, then stream
# Delete consumer first, then stream, then the durable DLQ advisory consumer
consumer_deleted = await self.delete_consumer(job_id)
stream_deleted = await self.delete_stream(job_id)
dlq_consumer_deleted = await self.delete_dlq_consumer(job_id)

return consumer_deleted and stream_deleted
return consumer_deleted and stream_deleted and dlq_consumer_deleted
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

delete_dlq_consumer() returns False on any exception (including NotFoundError when the DLQ consumer was never created). Because cleanup_job_resources() now ANDs this result, normal job cleanup can incorrectly report failure even though the job stream/consumer were deleted successfully. Treat a missing DLQ consumer as a successful no-op (e.g., catch nats.js.errors.NotFoundError and return True), and consider whether cleanup_job_resources() should proceed even if DLQ cleanup fails.

Copilot uses AI. Check for mistakes.
# ACKs can be silently dropped when the subscription is torn down.
await self.nc.flush()

except asyncio.TimeoutError:
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

psub.fetch(...) timeouts are caught as asyncio.TimeoutError here, but elsewhere in this module (reserve_tasks) the same API raises nats.errors.TimeoutError. If fetch() raises nats.errors.TimeoutError, this block will fall into the outer except Exception and log an error instead of the intended "No advisory messages" path. Catch nats.errors.TimeoutError (or both exception types) for consistency.

Suggested change
except asyncio.TimeoutError:
except (asyncio.TimeoutError, nats.errors.TimeoutError):

Copilot uses AI. Check for mistakes.
Comment on lines +361 to +374
async def get_dead_letter_task_ids(self, job_id: int, n: int = 10) -> list[str]:
"""
Get task IDs from dead letter queue (messages that exceeded max delivery attempts).

Pulls from persistent advisory stream to find failed messages, then looks up task IDs.
Uses a durable consumer so acknowledged advisories are not re-delivered on subsequent calls.

Args:
job_id: The job ID (integer primary key)
n: Maximum number of task IDs to return (default: 10)

Returns:
List of task IDs that failed to process after max retry attempts
"""
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_dead_letter_task_ids/dead_letter_ids are described as returning "task IDs", but the implementation actually extracts and returns image_id from the task payload (and the management command prints "Image ID"). This mismatch is confusing for callers; either rename the method/variables/docstring to reflect image IDs, or change the extraction to return the task's actual identifier field.

Copilot uses AI. Check for mistakes.
Comment on lines +336 to +355
async def _setup_advisory_stream(self):
"""Ensure the shared advisory stream exists to capture max-delivery events.

Called on every __aenter__ so that advisories are captured from the moment
any TaskQueueManager connection is opened, not just when the DLQ is first read.
"""
try:
await asyncio.wait_for(
self.js.add_stream(
name="advisories",
subjects=["$JS.EVENT.ADVISORY.>"],
max_age=3600, # Keep advisories for 1 hour
),
timeout=NATS_JETSTREAM_TIMEOUT,
)
logger.info("Advisory stream created")
except asyncio.TimeoutError:
raise # NATS unreachable — propagate so __aenter__ fails fast
except nats.js.errors.BadRequestError:
pass # Stream already exists
Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_setup_advisory_stream suppresses all BadRequestErrors as "stream already exists". BadRequestError can also indicate invalid subjects/config, so this can hide real misconfiguration. Prefer checking existence via stream_info("advisories") and only creating on NotFoundError, or inspect the error code/message and only ignore the specific "already exists" case.

Copilot uses AI. Check for mistakes.
Comment on lines +336 to +436
async def _setup_advisory_stream(self):
"""Ensure the shared advisory stream exists to capture max-delivery events.

Called on every __aenter__ so that advisories are captured from the moment
any TaskQueueManager connection is opened, not just when the DLQ is first read.
"""
try:
await asyncio.wait_for(
self.js.add_stream(
name="advisories",
subjects=["$JS.EVENT.ADVISORY.>"],
max_age=3600, # Keep advisories for 1 hour
),
timeout=NATS_JETSTREAM_TIMEOUT,
)
logger.info("Advisory stream created")
except asyncio.TimeoutError:
raise # NATS unreachable — propagate so __aenter__ fails fast
except nats.js.errors.BadRequestError:
pass # Stream already exists

def _get_dlq_consumer_name(self, job_id: int) -> str:
"""Get the durable consumer name for dead letter queue advisory tracking."""
return f"job-{job_id}-dlq"

async def get_dead_letter_task_ids(self, job_id: int, n: int = 10) -> list[str]:
"""
Get task IDs from dead letter queue (messages that exceeded max delivery attempts).

Pulls from persistent advisory stream to find failed messages, then looks up task IDs.
Uses a durable consumer so acknowledged advisories are not re-delivered on subsequent calls.

Args:
job_id: The job ID (integer primary key)
n: Maximum number of task IDs to return (default: 10)

Returns:
List of task IDs that failed to process after max retry attempts
"""
if self.nc is None or self.js is None:
raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.")

stream_name = self._get_stream_name(job_id)
consumer_name = self._get_consumer_name(job_id)
dlq_consumer_name = self._get_dlq_consumer_name(job_id)
dead_letter_ids = []

try:
subject_filter = f"$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.{stream_name}.{consumer_name}"

# Use a durable consumer so ACKs persist across calls — ephemeral consumers
# are deleted on unsubscribe, discarding all ACK tracking and causing every
# advisory to be re-delivered on the next call.
psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)

try:
msgs = await psub.fetch(n, timeout=1.0)

for msg in msgs:
advisory_data = json.loads(msg.data.decode())

# Get the stream sequence of the failed message
if "stream_seq" in advisory_data:
stream_seq = advisory_data["stream_seq"]

# Look up the actual message by sequence to get task ID
try:
job_msg = await self.js.get_msg(stream_name, stream_seq)

if job_msg and job_msg.data:
task_data = json.loads(job_msg.data.decode())

if "image_id" in task_data:
dead_letter_ids.append(str(task_data["image_id"]))
else:
logger.warning(f"No image_id found in task data: {task_data}")
except Exception as e:
logger.warning(f"Could not retrieve message {stream_seq} from {stream_name}: {e}")
# The message might have been discarded after max_deliver exceeded

# Acknowledge the advisory message so the durable consumer won't re-deliver it
await msg.ack()
logger.info(
f"Acknowledged advisory message for stream_seq {advisory_data.get('stream_seq', 'unknown')}"
)

# Flush to ensure all ACKs are written to the socket before unsubscribing.
# msg.ack() only queues a publish in the client buffer; without flush() the
# ACKs can be silently dropped when the subscription is torn down.
await self.nc.flush()

except asyncio.TimeoutError:
logger.info(f"No advisory messages found for job {job_id}")
finally:
await psub.unsubscribe()

except Exception as e:
logger.error(f"Failed to get dead letter task IDs for job '{job_id}': {e}")

return dead_letter_ids[:n]

Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New DLQ behavior (_setup_advisory_stream, get_dead_letter_task_ids, delete_dlq_consumer) isn’t covered by unit tests in this module, while the rest of TaskQueueManager is. Add tests that (1) verify the advisory stream/consumer interactions, (2) exercise the no-advisory timeout path, and (3) ensure cleanup succeeds when the DLQ consumer does not exist.

Copilot uses AI. Check for mistakes.
Comment on lines +28 to +33
def handle(self, *args, **options):
job_id = options["job_id"]

try:
dead_letter_ids = asyncio.run(self._check_dead_letter_queue(job_id))

Copy link

Copilot AI Mar 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This command uses asyncio.run(...) inside handle(). Elsewhere in this repo (e.g. ami/jobs/management/commands/chaos_monkey.py) async work in management commands is bridged with asgiref.sync.async_to_sync, which avoids RuntimeError: asyncio.run() cannot be called from a running event loop in environments that already have a loop. Consider switching to async_to_sync for consistency and robustness.

Copilot uses AI. Check for mistakes.
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (4)
ami/ml/orchestration/nats_queue.py (1)

437-460: DLQ consumer deletion logs warning on any failure, including "not found".

If a DLQ consumer was never created (no tasks exceeded max deliveries), delete_dlq_consumer will log a warning at line 459. Consider catching NotFoundError separately and logging at debug level to reduce noise for the common case where no DLQ consumer exists.

♻️ Proposed refinement
     async def delete_dlq_consumer(self, job_id: int) -> bool:
         ...
         dlq_consumer_name = self._get_dlq_consumer_name(job_id)
         try:
             await asyncio.wait_for(
                 self.js.delete_consumer("advisories", dlq_consumer_name),
                 timeout=NATS_JETSTREAM_TIMEOUT,
             )
             logger.info(f"Deleted DLQ consumer {dlq_consumer_name} for job '{job_id}'")
             return True
+        except nats.js.errors.NotFoundError:
+            logger.debug(f"DLQ consumer {dlq_consumer_name} not found for job '{job_id}' (never created)")
+            return True  # Not an error - consumer simply never existed
         except Exception as e:
             logger.warning(f"Failed to delete DLQ consumer for job '{job_id}': {e}")
             return False
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 437 - 460, In
delete_dlq_consumer, handle the "consumer not found" case separately: when
calling self.js.delete_consumer("advisories", dlq_consumer_name) catch the
NotFoundError (the specific JetStream not-found exception class) and log it at
debug level instead of warning, while keeping other exceptions falling back to
the existing warning and returning False; update references to
js.delete_consumer, dlq_consumer_name, and the logger.warning call to implement
the separate except NotFoundError: logger.debug(...) branch.
ami/ml/orchestration/tests/test_nats_queue.py (1)

149-160: Consider adding dedicated tests for new DLQ methods.

The existing test verifies cleanup_job_resources calls delete_consumer twice, but there are no dedicated tests for:

  • get_dead_letter_task_ids (the core new functionality)
  • delete_dlq_consumer
  • _setup_advisory_stream

These methods have non-trivial logic (JSON parsing, message lookups, error handling) that would benefit from unit test coverage.

Would you like me to help generate test cases for these new methods?

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 149 - 160, Add
dedicated unit tests for the new DLQ-related methods: create tests that call
get_dead_letter_task_ids (mocking js.api.get_msg and simulating messages with
valid/invalid JSON and missing task_id) to assert it returns correct IDs and
handles parsing errors; test delete_dlq_consumer by mocking js.delete_consumer
to verify it is called with the DLQ consumer name and that errors are
handled/propagated as expected; and test _setup_advisory_stream by mocking
js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add depending on
usage) to verify stream/consumer creation logic and error paths. Use the same
pattern as existing tests (patch get_connection to return mocked nc, js) and
assert call counts and returned values for get_dead_letter_task_ids,
delete_dlq_consumer, and _setup_advisory_stream.
ami/ml/management/commands/check_dead_letter_queue.py (2)

21-26: Consider adding optional arguments for count and verbosity.

The command currently retrieves only 10 DLQ entries (the default in get_dead_letter_task_ids). For jobs with many failed tasks, operators may want to retrieve more. Consider adding:

  • --count / -n to control how many entries to fetch
  • --all flag to fetch all available entries
♻️ Example enhancement
     def add_arguments(self, parser):
         parser.add_argument(
             "job_id",
             type=int,
             help="Job ID to check for dead letter queue messages",
         )
+        parser.add_argument(
+            "-n", "--count",
+            type=int,
+            default=10,
+            help="Maximum number of dead letter tasks to retrieve (default: 10)",
+        )

     async def _check_dead_letter_queue(self, job_id: int) -> list[str]:
+    async def _check_dead_letter_queue(self, job_id: int, count: int = 10) -> list[str]:
         """Check for dead letter queue messages using TaskQueueManager."""
         async with TaskQueueManager() as manager:
-            return await manager.get_dead_letter_task_ids(job_id)
+            return await manager.get_dead_letter_task_ids(job_id, n=count)

Also applies to: 46-49

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 21 - 26,
The management command add_arguments should be extended to accept optional
--count/-n (int) and --all (store_true) flags, make them mutually exclusive or
validate in handle, and pass the chosen value into get_dead_letter_task_ids when
invoked in handle; specifically, update add_arguments to add
parser.add_argument('--count', '-n', type=int, default=10, help=...) and
parser.add_argument('--all', action='store_true', help=...), then in handle read
options['count'] and options['all'], validate that both aren't set, convert
--all into an appropriate sentinel (e.g., None or a high limit) and call
get_dead_letter_task_ids(job_id, count=...) or get_dead_letter_task_ids(job_id,
fetch_all=True) depending on that function's signature, and update help
text/usage accordingly so operators can request more or all DLQ entries.

43-44: Preserve exception chain for better debugging.

Using raise ... from e preserves the original traceback, making it easier to debug issues when the command fails.

♻️ Proposed fix
         except Exception as e:
-            raise CommandError(f"Failed to check dead letter queue: {e}")
+            raise CommandError(f"Failed to check dead letter queue: {e}") from e
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 43 - 44,
The except block in check_dead_letter_queue.py currently re-raises a
CommandError without preserving the original traceback; update the handler
inside the Command class (the handle method where you catch "except Exception as
e") to re-raise the CommandError using "raise CommandError(... ) from e" so the
original exception chain is preserved; keep the existing message string but
attach the original exception as the cause by using "from e" with the
CommandError symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 474-479: The current cleanup aggregates delete_consumer,
delete_stream and delete_dlq_consumer results so a DLQ deletion failure makes
the whole cleanup fail; change this so delete_dlq_consumer is attempted but
treated best-effort: keep calling await self.delete_dlq_consumer(job_id) (and
optionally log a warning on failure) but return only the combined result of
await self.delete_consumer(job_id) and await self.delete_stream(job_id) (i.e.,
use consumer_deleted and stream_deleted for the final boolean), leaving
delete_dlq_consumer failure from affecting the overall return; refer to the
identifiers delete_consumer, delete_stream and delete_dlq_consumer in
nats_queue.py.
- Line 389: The pull_subscribe call using durable=dlq_consumer_name is missing
the stream parameter, so ensure the durable consumer is created on the correct
stream by adding stream="advisories" to the self.js.pull_subscribe call (the
call that assigns psub). Update the invocation of
self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) to include
stream="advisories" so the durable consumer is associated with the "advisories"
stream.
- Around line 361-435: The review notes to change the error logging in
get_dead_letter_task_ids to include a stack trace: replace the logger.error call
in the outer exception handler with logger.exception (on the same logger) so the
exception context/traceback is automatically logged instead of manually
formatting the exception string; keep the same message text but remove the
manual "{e}" formatting when calling logger.exception to avoid duplication.

---

Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 21-26: The management command add_arguments should be extended to
accept optional --count/-n (int) and --all (store_true) flags, make them
mutually exclusive or validate in handle, and pass the chosen value into
get_dead_letter_task_ids when invoked in handle; specifically, update
add_arguments to add parser.add_argument('--count', '-n', type=int, default=10,
help=...) and parser.add_argument('--all', action='store_true', help=...), then
in handle read options['count'] and options['all'], validate that both aren't
set, convert --all into an appropriate sentinel (e.g., None or a high limit) and
call get_dead_letter_task_ids(job_id, count=...) or
get_dead_letter_task_ids(job_id, fetch_all=True) depending on that function's
signature, and update help text/usage accordingly so operators can request more
or all DLQ entries.
- Around line 43-44: The except block in check_dead_letter_queue.py currently
re-raises a CommandError without preserving the original traceback; update the
handler inside the Command class (the handle method where you catch "except
Exception as e") to re-raise the CommandError using "raise CommandError(... )
from e" so the original exception chain is preserved; keep the existing message
string but attach the original exception as the cause by using "from e" with the
CommandError symbol.

In `@ami/ml/orchestration/nats_queue.py`:
- Around line 437-460: In delete_dlq_consumer, handle the "consumer not found"
case separately: when calling self.js.delete_consumer("advisories",
dlq_consumer_name) catch the NotFoundError (the specific JetStream not-found
exception class) and log it at debug level instead of warning, while keeping
other exceptions falling back to the existing warning and returning False;
update references to js.delete_consumer, dlq_consumer_name, and the
logger.warning call to implement the separate except NotFoundError:
logger.debug(...) branch.

In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Around line 149-160: Add dedicated unit tests for the new DLQ-related methods:
create tests that call get_dead_letter_task_ids (mocking js.api.get_msg and
simulating messages with valid/invalid JSON and missing task_id) to assert it
returns correct IDs and handles parsing errors; test delete_dlq_consumer by
mocking js.delete_consumer to verify it is called with the DLQ consumer name and
that errors are handled/propagated as expected; and test _setup_advisory_stream
by mocking js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add
depending on usage) to verify stream/consumer creation logic and error paths.
Use the same pattern as existing tests (patch get_connection to return mocked
nc, js) and assert call counts and returned values for get_dead_letter_task_ids,
delete_dlq_consumer, and _setup_advisory_stream.

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 5c3ca6a5-cb22-4bb1-bfc5-0c9809717a0b

📥 Commits

Reviewing files that changed from the base of the PR and between dafb83a and db05526.

📒 Files selected for processing (3)
  • ami/ml/management/commands/check_dead_letter_queue.py
  • ami/ml/orchestration/nats_queue.py
  • ami/ml/orchestration/tests/test_nats_queue.py

# Use a durable consumer so ACKs persist across calls — ephemeral consumers
# are deleted on unsubscribe, discarding all ACK tracking and causing every
# advisory to be re-delivered on the next call.
psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Missing stream parameter for durable consumer creation.

When calling pull_subscribe with a durable name, you should also specify the stream parameter to ensure the consumer is created on the correct stream ("advisories"). Without it, the library may fail to locate or create the consumer correctly.

🔧 Proposed fix
-            psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)
+            psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name, stream="advisories")
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)
psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name, stream="advisories")
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` at line 389, The pull_subscribe call
using durable=dlq_consumer_name is missing the stream parameter, so ensure the
durable consumer is created on the correct stream by adding stream="advisories"
to the self.js.pull_subscribe call (the call that assigns psub). Update the
invocation of self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)
to include stream="advisories" so the durable consumer is associated with the
"advisories" stream.

Comment on lines +474 to +479
# Delete consumer first, then stream, then the durable DLQ advisory consumer
consumer_deleted = await self.delete_consumer(job_id)
stream_deleted = await self.delete_stream(job_id)
dlq_consumer_deleted = await self.delete_dlq_consumer(job_id)

return consumer_deleted and stream_deleted
return consumer_deleted and stream_deleted and dlq_consumer_deleted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

DLQ consumer deletion failure now causes overall cleanup to return False.

Per context snippet 2, cleanup_async_job_resources in jobs.py logs "Failed to clean up NATS resources" when cleanup_job_resources returns False. With this change, if the DLQ consumer deletion fails (e.g., consumer never existed because get_dead_letter_task_ids was never called), the entire cleanup will be reported as failed even though the main consumer and stream were successfully deleted.

Consider whether DLQ consumer deletion should be best-effort (not affecting the return value) since the DLQ consumer may not exist for jobs that completed successfully without any max-delivery failures.

💡 Alternative: Make DLQ deletion best-effort
         consumer_deleted = await self.delete_consumer(job_id)
         stream_deleted = await self.delete_stream(job_id)
-        dlq_consumer_deleted = await self.delete_dlq_consumer(job_id)
+        # DLQ consumer may not exist if no tasks exceeded max deliveries
+        await self.delete_dlq_consumer(job_id)  # Best-effort, don't affect return
 
-        return consumer_deleted and stream_deleted and dlq_consumer_deleted
+        return consumer_deleted and stream_deleted
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
# Delete consumer first, then stream, then the durable DLQ advisory consumer
consumer_deleted = await self.delete_consumer(job_id)
stream_deleted = await self.delete_stream(job_id)
dlq_consumer_deleted = await self.delete_dlq_consumer(job_id)
return consumer_deleted and stream_deleted
return consumer_deleted and stream_deleted and dlq_consumer_deleted
# Delete consumer first, then stream, then the durable DLQ advisory consumer
consumer_deleted = await self.delete_consumer(job_id)
stream_deleted = await self.delete_stream(job_id)
# DLQ consumer may not exist if no tasks exceeded max deliveries
await self.delete_dlq_consumer(job_id) # Best-effort, don't affect return
return consumer_deleted and stream_deleted
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@ami/ml/orchestration/nats_queue.py` around lines 474 - 479, The current
cleanup aggregates delete_consumer, delete_stream and delete_dlq_consumer
results so a DLQ deletion failure makes the whole cleanup fail; change this so
delete_dlq_consumer is attempted but treated best-effort: keep calling await
self.delete_dlq_consumer(job_id) (and optionally log a warning on failure) but
return only the combined result of await self.delete_consumer(job_id) and await
self.delete_stream(job_id) (i.e., use consumer_deleted and stream_deleted for
the final boolean), leaving delete_dlq_consumer failure from affecting the
overall return; refer to the identifiers delete_consumer, delete_stream and
delete_dlq_consumer in nats_queue.py.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants