Support for NATS dead letter queue#1175
Conversation
* fix: prevent NATS connection flooding and stale job task fetching - Add connect_timeout=5, allow_reconnect=False to NATS connections to prevent leaked reconnection loops from blocking Django's event loop - Guard /tasks endpoint against terminal-status jobs (return empty tasks instead of attempting NATS reserve) - IncompleteJobFilter now excludes jobs by top-level status in addition to progress JSON stages - Add stale worker cleanup to integration test script Found during PSv2 integration testing where stale ADC workers with default DataLoader parallelism overwhelmed the single uvicorn worker thread by flooding /tasks with concurrent NATS reserve requests. Co-Authored-By: Claude <noreply@anthropic.com> * docs: PSv2 integration test session notes and NATS flooding findings Session notes from 2026-02-16 integration test including root cause analysis of stale worker task competition and NATS connection issues. Findings doc tracks applied fixes and remaining TODOs with priorities. Co-Authored-By: Claude <noreply@anthropic.com> * docs: update session notes with successful test run #3 PSv2 integration test passed end-to-end (job 1380, 20/20 images). Identified ack_wait=300s as cause of ~5min idle time when GPU processes race for NATS tasks. Co-Authored-By: Claude <noreply@anthropic.com> * fix: batch NATS task fetch to prevent HTTP timeouts Replace N×1 reserve_task() calls with single reserve_tasks() batch fetch. The previous implementation created a new pull subscription per message (320 NATS round trips for batch=64), causing the /tasks endpoint to exceed HTTP client timeouts. The new approach uses one psub.fetch() call for the entire batch. Co-Authored-By: Claude <noreply@anthropic.com> * docs: add next session prompt * feat: add pipeline__slug__in filter for multi-pipeline job queries Workers that handle multiple pipelines can now fetch jobs for all of them in a single request: ?pipeline__slug__in=slug1,slug2 Co-Authored-By: Claude <noreply@anthropic.com> * chore: remove local-only docs and scripts from branch These files are session notes, planning docs, and test scripts that should stay local rather than be part of the PR. Co-Authored-By: Claude <noreply@anthropic.com> * feat: set job dispatch_mode at creation time based on project feature flags ML jobs with a pipeline now get dispatch_mode set during setup() instead of waiting until run() is called by the Celery worker. This lets the UI show the correct mode immediately after job creation. Co-Authored-By: Claude <noreply@anthropic.com> * fix: add timeouts to all JetStream operations and restore reconnect policy Add NATS_JETSTREAM_TIMEOUT (10s) to all JetStream metadata operations via asyncio.wait_for() so a hung NATS connection fails fast instead of blocking the caller's thread indefinitely. Also restore the intended reconnect policy (2 attempts, 1s wait) that was lost in a prior force push. Co-Authored-By: Claude <noreply@anthropic.com> * fix: propagate NATS timeouts as 503 instead of swallowing them asyncio.TimeoutError from _ensure_stream() and _ensure_consumer() was caught by the broad `except Exception` in reserve_tasks(), silently returning [] and making NATS outages indistinguishable from empty queues. Workers would then poll immediately, recreating the flooding problem. - Add explicit `except asyncio.TimeoutError: raise` in reserve_tasks() - Catch TimeoutError and OSError in the /tasks view, return 503 - Restore allow_reconnect=False (fail-fast on connection issues) - Add return type annotation to get_connection() Co-Authored-By: Claude <noreply@anthropic.com> * fix: address review comments (log level, fetch timeout, docstring) - Downgrade reserve_tasks log to DEBUG when zero tasks reserved (avoid log spam from frequent polling) - Pass timeout=0.5 from /tasks endpoint to avoid blocking the worker for 5s on empty queues - Fix docstring examples using string 'job123' for int-typed job_id Co-Authored-By: Claude <noreply@anthropic.com> * fix: catch nats.errors.Error in /tasks endpoint for proper 503 responses NoServersError, ConnectionClosedError, and other NATS exceptions inherit from nats.errors.Error (not OSError), so they escaped the handler and returned 500 instead of 503. Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Claude <noreply@anthropic.com>
…olnickLab#1142) * feat: configurable NATS tuning and gunicorn worker management Rebase onto main after RolnickLab#1135 merge. Keep only the additions unique to this branch: - Make TASK_TTR configurable via NATS_TASK_TTR Django setting (default 30s) - Make max_ack_pending configurable via NATS_MAX_ACK_PENDING setting (default 100) - Local dev: switch to gunicorn+UvicornWorker by default for production parity, with USE_UVICORN=1 escape hatch for raw uvicorn - Production: auto-detect WEB_CONCURRENCY from CPU cores (capped at 8) when not explicitly set in the environment Co-Authored-By: Claude <noreply@anthropic.com> * fix: address PR review comments - Fix max_ack_pending falsy-zero guard (use `is not None` instead of `or`) - Update TaskQueueManager docstring with Args section - Simplify production WEB_CONCURRENCY fallback (just use nproc) Co-Authored-By: Claude <noreply@anthropic.com> --------- Co-authored-by: Michael Bunsen <notbot@gmail.com> Co-authored-by: Claude <noreply@anthropic.com>
* fix: include pipeline_slug in MinimalJobSerializer (ids_only response) The ADC worker fetches jobs with ids_only=1 and expects pipeline_slug in the response to know which pipeline to run. Without it, Pydantic validation fails and the worker skips the job. Co-Authored-By: Claude <noreply@anthropic.com> * Update ami/jobs/serializers.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --------- Co-authored-by: Claude <noreply@anthropic.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
👷 Deploy request for antenna-ssec pending review.Visit the deploys page to approve it
|
✅ Deploy Preview for antenna-preview canceled.
|
📝 WalkthroughWalkthroughA new Django management command ( Changes
Sequence DiagramsequenceDiagram
participant CLI as Management Command
participant TQM as TaskQueueManager
participant NATS as NATS Connection
participant Stream as Advisory Stream
CLI->>TQM: check_dead_letter_queue(job_id)
TQM->>NATS: __aenter__() async context
NATS->>Stream: _setup_advisory_stream()
Stream-->>NATS: advisory stream created/validated
NATS-->>TQM: context ready
TQM->>NATS: get_dead_letter_task_ids(job_id)
NATS->>Stream: read advisory events (max-delivery)
Stream-->>NATS: retrieve failed task events
NATS->>NATS: extract image_ids from messages
NATS->>NATS: acknowledge processed advisories
NATS->>NATS: flush connection
NATS-->>TQM: list of failed task IDs
TQM-->>CLI: display results (found/not found)
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
Adds dead letter queue (DLQ) support to the NATS/JetStream-backed async job task queue by capturing max-delivery advisories, enabling operators to query failed tasks and improving ACK reliability during teardown.
Changes:
- Create/use a shared JetStream “advisories” stream and a per-job durable advisory consumer to track max-delivery (DLQ) events.
- Add
TaskQueueManager.get_dead_letter_task_ids()and DLQ consumer cleanup hooks. - Introduce a Django management command to query DLQ entries for a job from the CLI.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 6 comments.
| File | Description |
|---|---|
ami/ml/orchestration/nats_queue.py |
Adds advisory stream setup on connect, DLQ query method, extra flushes, and DLQ consumer cleanup. |
ami/ml/orchestration/tests/test_nats_queue.py |
Updates unit tests for new advisory stream creation and additional consumer deletion. |
ami/ml/management/commands/check_dead_letter_queue.py |
Adds CLI command to print DLQ task/image IDs for a job. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| @@ -342,8 +471,9 @@ async def cleanup_job_resources(self, job_id: int) -> bool: | |||
| Returns: | |||
| bool: True if successful, False otherwise | |||
| """ | |||
| # Delete consumer first, then stream | |||
| # Delete consumer first, then stream, then the durable DLQ advisory consumer | |||
| consumer_deleted = await self.delete_consumer(job_id) | |||
| stream_deleted = await self.delete_stream(job_id) | |||
| dlq_consumer_deleted = await self.delete_dlq_consumer(job_id) | |||
|
|
|||
| return consumer_deleted and stream_deleted | |||
| return consumer_deleted and stream_deleted and dlq_consumer_deleted | |||
There was a problem hiding this comment.
delete_dlq_consumer() returns False on any exception (including NotFoundError when the DLQ consumer was never created). Because cleanup_job_resources() now ANDs this result, normal job cleanup can incorrectly report failure even though the job stream/consumer were deleted successfully. Treat a missing DLQ consumer as a successful no-op (e.g., catch nats.js.errors.NotFoundError and return True), and consider whether cleanup_job_resources() should proceed even if DLQ cleanup fails.
| # ACKs can be silently dropped when the subscription is torn down. | ||
| await self.nc.flush() | ||
|
|
||
| except asyncio.TimeoutError: |
There was a problem hiding this comment.
psub.fetch(...) timeouts are caught as asyncio.TimeoutError here, but elsewhere in this module (reserve_tasks) the same API raises nats.errors.TimeoutError. If fetch() raises nats.errors.TimeoutError, this block will fall into the outer except Exception and log an error instead of the intended "No advisory messages" path. Catch nats.errors.TimeoutError (or both exception types) for consistency.
| except asyncio.TimeoutError: | |
| except (asyncio.TimeoutError, nats.errors.TimeoutError): |
| async def get_dead_letter_task_ids(self, job_id: int, n: int = 10) -> list[str]: | ||
| """ | ||
| Get task IDs from dead letter queue (messages that exceeded max delivery attempts). | ||
|
|
||
| Pulls from persistent advisory stream to find failed messages, then looks up task IDs. | ||
| Uses a durable consumer so acknowledged advisories are not re-delivered on subsequent calls. | ||
|
|
||
| Args: | ||
| job_id: The job ID (integer primary key) | ||
| n: Maximum number of task IDs to return (default: 10) | ||
|
|
||
| Returns: | ||
| List of task IDs that failed to process after max retry attempts | ||
| """ |
There was a problem hiding this comment.
get_dead_letter_task_ids/dead_letter_ids are described as returning "task IDs", but the implementation actually extracts and returns image_id from the task payload (and the management command prints "Image ID"). This mismatch is confusing for callers; either rename the method/variables/docstring to reflect image IDs, or change the extraction to return the task's actual identifier field.
| async def _setup_advisory_stream(self): | ||
| """Ensure the shared advisory stream exists to capture max-delivery events. | ||
|
|
||
| Called on every __aenter__ so that advisories are captured from the moment | ||
| any TaskQueueManager connection is opened, not just when the DLQ is first read. | ||
| """ | ||
| try: | ||
| await asyncio.wait_for( | ||
| self.js.add_stream( | ||
| name="advisories", | ||
| subjects=["$JS.EVENT.ADVISORY.>"], | ||
| max_age=3600, # Keep advisories for 1 hour | ||
| ), | ||
| timeout=NATS_JETSTREAM_TIMEOUT, | ||
| ) | ||
| logger.info("Advisory stream created") | ||
| except asyncio.TimeoutError: | ||
| raise # NATS unreachable — propagate so __aenter__ fails fast | ||
| except nats.js.errors.BadRequestError: | ||
| pass # Stream already exists |
There was a problem hiding this comment.
_setup_advisory_stream suppresses all BadRequestErrors as "stream already exists". BadRequestError can also indicate invalid subjects/config, so this can hide real misconfiguration. Prefer checking existence via stream_info("advisories") and only creating on NotFoundError, or inspect the error code/message and only ignore the specific "already exists" case.
| async def _setup_advisory_stream(self): | ||
| """Ensure the shared advisory stream exists to capture max-delivery events. | ||
|
|
||
| Called on every __aenter__ so that advisories are captured from the moment | ||
| any TaskQueueManager connection is opened, not just when the DLQ is first read. | ||
| """ | ||
| try: | ||
| await asyncio.wait_for( | ||
| self.js.add_stream( | ||
| name="advisories", | ||
| subjects=["$JS.EVENT.ADVISORY.>"], | ||
| max_age=3600, # Keep advisories for 1 hour | ||
| ), | ||
| timeout=NATS_JETSTREAM_TIMEOUT, | ||
| ) | ||
| logger.info("Advisory stream created") | ||
| except asyncio.TimeoutError: | ||
| raise # NATS unreachable — propagate so __aenter__ fails fast | ||
| except nats.js.errors.BadRequestError: | ||
| pass # Stream already exists | ||
|
|
||
| def _get_dlq_consumer_name(self, job_id: int) -> str: | ||
| """Get the durable consumer name for dead letter queue advisory tracking.""" | ||
| return f"job-{job_id}-dlq" | ||
|
|
||
| async def get_dead_letter_task_ids(self, job_id: int, n: int = 10) -> list[str]: | ||
| """ | ||
| Get task IDs from dead letter queue (messages that exceeded max delivery attempts). | ||
|
|
||
| Pulls from persistent advisory stream to find failed messages, then looks up task IDs. | ||
| Uses a durable consumer so acknowledged advisories are not re-delivered on subsequent calls. | ||
|
|
||
| Args: | ||
| job_id: The job ID (integer primary key) | ||
| n: Maximum number of task IDs to return (default: 10) | ||
|
|
||
| Returns: | ||
| List of task IDs that failed to process after max retry attempts | ||
| """ | ||
| if self.nc is None or self.js is None: | ||
| raise RuntimeError("Connection is not open. Use TaskQueueManager as an async context manager.") | ||
|
|
||
| stream_name = self._get_stream_name(job_id) | ||
| consumer_name = self._get_consumer_name(job_id) | ||
| dlq_consumer_name = self._get_dlq_consumer_name(job_id) | ||
| dead_letter_ids = [] | ||
|
|
||
| try: | ||
| subject_filter = f"$JS.EVENT.ADVISORY.CONSUMER.MAX_DELIVERIES.{stream_name}.{consumer_name}" | ||
|
|
||
| # Use a durable consumer so ACKs persist across calls — ephemeral consumers | ||
| # are deleted on unsubscribe, discarding all ACK tracking and causing every | ||
| # advisory to be re-delivered on the next call. | ||
| psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) | ||
|
|
||
| try: | ||
| msgs = await psub.fetch(n, timeout=1.0) | ||
|
|
||
| for msg in msgs: | ||
| advisory_data = json.loads(msg.data.decode()) | ||
|
|
||
| # Get the stream sequence of the failed message | ||
| if "stream_seq" in advisory_data: | ||
| stream_seq = advisory_data["stream_seq"] | ||
|
|
||
| # Look up the actual message by sequence to get task ID | ||
| try: | ||
| job_msg = await self.js.get_msg(stream_name, stream_seq) | ||
|
|
||
| if job_msg and job_msg.data: | ||
| task_data = json.loads(job_msg.data.decode()) | ||
|
|
||
| if "image_id" in task_data: | ||
| dead_letter_ids.append(str(task_data["image_id"])) | ||
| else: | ||
| logger.warning(f"No image_id found in task data: {task_data}") | ||
| except Exception as e: | ||
| logger.warning(f"Could not retrieve message {stream_seq} from {stream_name}: {e}") | ||
| # The message might have been discarded after max_deliver exceeded | ||
|
|
||
| # Acknowledge the advisory message so the durable consumer won't re-deliver it | ||
| await msg.ack() | ||
| logger.info( | ||
| f"Acknowledged advisory message for stream_seq {advisory_data.get('stream_seq', 'unknown')}" | ||
| ) | ||
|
|
||
| # Flush to ensure all ACKs are written to the socket before unsubscribing. | ||
| # msg.ack() only queues a publish in the client buffer; without flush() the | ||
| # ACKs can be silently dropped when the subscription is torn down. | ||
| await self.nc.flush() | ||
|
|
||
| except asyncio.TimeoutError: | ||
| logger.info(f"No advisory messages found for job {job_id}") | ||
| finally: | ||
| await psub.unsubscribe() | ||
|
|
||
| except Exception as e: | ||
| logger.error(f"Failed to get dead letter task IDs for job '{job_id}': {e}") | ||
|
|
||
| return dead_letter_ids[:n] | ||
|
|
There was a problem hiding this comment.
New DLQ behavior (_setup_advisory_stream, get_dead_letter_task_ids, delete_dlq_consumer) isn’t covered by unit tests in this module, while the rest of TaskQueueManager is. Add tests that (1) verify the advisory stream/consumer interactions, (2) exercise the no-advisory timeout path, and (3) ensure cleanup succeeds when the DLQ consumer does not exist.
| def handle(self, *args, **options): | ||
| job_id = options["job_id"] | ||
|
|
||
| try: | ||
| dead_letter_ids = asyncio.run(self._check_dead_letter_queue(job_id)) | ||
|
|
There was a problem hiding this comment.
This command uses asyncio.run(...) inside handle(). Elsewhere in this repo (e.g. ami/jobs/management/commands/chaos_monkey.py) async work in management commands is bridged with asgiref.sync.async_to_sync, which avoids RuntimeError: asyncio.run() cannot be called from a running event loop in environments that already have a loop. Consider switching to async_to_sync for consistency and robustness.
There was a problem hiding this comment.
Actionable comments posted: 3
🧹 Nitpick comments (4)
ami/ml/orchestration/nats_queue.py (1)
437-460: DLQ consumer deletion logs warning on any failure, including "not found".If a DLQ consumer was never created (no tasks exceeded max deliveries),
delete_dlq_consumerwill log a warning at line 459. Consider catchingNotFoundErrorseparately and logging at debug level to reduce noise for the common case where no DLQ consumer exists.♻️ Proposed refinement
async def delete_dlq_consumer(self, job_id: int) -> bool: ... dlq_consumer_name = self._get_dlq_consumer_name(job_id) try: await asyncio.wait_for( self.js.delete_consumer("advisories", dlq_consumer_name), timeout=NATS_JETSTREAM_TIMEOUT, ) logger.info(f"Deleted DLQ consumer {dlq_consumer_name} for job '{job_id}'") return True + except nats.js.errors.NotFoundError: + logger.debug(f"DLQ consumer {dlq_consumer_name} not found for job '{job_id}' (never created)") + return True # Not an error - consumer simply never existed except Exception as e: logger.warning(f"Failed to delete DLQ consumer for job '{job_id}': {e}") return False🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/nats_queue.py` around lines 437 - 460, In delete_dlq_consumer, handle the "consumer not found" case separately: when calling self.js.delete_consumer("advisories", dlq_consumer_name) catch the NotFoundError (the specific JetStream not-found exception class) and log it at debug level instead of warning, while keeping other exceptions falling back to the existing warning and returning False; update references to js.delete_consumer, dlq_consumer_name, and the logger.warning call to implement the separate except NotFoundError: logger.debug(...) branch.ami/ml/orchestration/tests/test_nats_queue.py (1)
149-160: Consider adding dedicated tests for new DLQ methods.The existing test verifies
cleanup_job_resourcescallsdelete_consumertwice, but there are no dedicated tests for:
get_dead_letter_task_ids(the core new functionality)delete_dlq_consumer_setup_advisory_streamThese methods have non-trivial logic (JSON parsing, message lookups, error handling) that would benefit from unit test coverage.
Would you like me to help generate test cases for these new methods?
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/orchestration/tests/test_nats_queue.py` around lines 149 - 160, Add dedicated unit tests for the new DLQ-related methods: create tests that call get_dead_letter_task_ids (mocking js.api.get_msg and simulating messages with valid/invalid JSON and missing task_id) to assert it returns correct IDs and handles parsing errors; test delete_dlq_consumer by mocking js.delete_consumer to verify it is called with the DLQ consumer name and that errors are handled/propagated as expected; and test _setup_advisory_stream by mocking js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add depending on usage) to verify stream/consumer creation logic and error paths. Use the same pattern as existing tests (patch get_connection to return mocked nc, js) and assert call counts and returned values for get_dead_letter_task_ids, delete_dlq_consumer, and _setup_advisory_stream.ami/ml/management/commands/check_dead_letter_queue.py (2)
21-26: Consider adding optional arguments for count and verbosity.The command currently retrieves only 10 DLQ entries (the default in
get_dead_letter_task_ids). For jobs with many failed tasks, operators may want to retrieve more. Consider adding:
--count/-nto control how many entries to fetch--allflag to fetch all available entries♻️ Example enhancement
def add_arguments(self, parser): parser.add_argument( "job_id", type=int, help="Job ID to check for dead letter queue messages", ) + parser.add_argument( + "-n", "--count", + type=int, + default=10, + help="Maximum number of dead letter tasks to retrieve (default: 10)", + ) async def _check_dead_letter_queue(self, job_id: int) -> list[str]: + async def _check_dead_letter_queue(self, job_id: int, count: int = 10) -> list[str]: """Check for dead letter queue messages using TaskQueueManager.""" async with TaskQueueManager() as manager: - return await manager.get_dead_letter_task_ids(job_id) + return await manager.get_dead_letter_task_ids(job_id, n=count)Also applies to: 46-49
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 21 - 26, The management command add_arguments should be extended to accept optional --count/-n (int) and --all (store_true) flags, make them mutually exclusive or validate in handle, and pass the chosen value into get_dead_letter_task_ids when invoked in handle; specifically, update add_arguments to add parser.add_argument('--count', '-n', type=int, default=10, help=...) and parser.add_argument('--all', action='store_true', help=...), then in handle read options['count'] and options['all'], validate that both aren't set, convert --all into an appropriate sentinel (e.g., None or a high limit) and call get_dead_letter_task_ids(job_id, count=...) or get_dead_letter_task_ids(job_id, fetch_all=True) depending on that function's signature, and update help text/usage accordingly so operators can request more or all DLQ entries.
43-44: Preserve exception chain for better debugging.Using
raise ... from epreserves the original traceback, making it easier to debug issues when the command fails.♻️ Proposed fix
except Exception as e: - raise CommandError(f"Failed to check dead letter queue: {e}") + raise CommandError(f"Failed to check dead letter queue: {e}") from e🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@ami/ml/management/commands/check_dead_letter_queue.py` around lines 43 - 44, The except block in check_dead_letter_queue.py currently re-raises a CommandError without preserving the original traceback; update the handler inside the Command class (the handle method where you catch "except Exception as e") to re-raise the CommandError using "raise CommandError(... ) from e" so the original exception chain is preserved; keep the existing message string but attach the original exception as the cause by using "from e" with the CommandError symbol.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 474-479: The current cleanup aggregates delete_consumer,
delete_stream and delete_dlq_consumer results so a DLQ deletion failure makes
the whole cleanup fail; change this so delete_dlq_consumer is attempted but
treated best-effort: keep calling await self.delete_dlq_consumer(job_id) (and
optionally log a warning on failure) but return only the combined result of
await self.delete_consumer(job_id) and await self.delete_stream(job_id) (i.e.,
use consumer_deleted and stream_deleted for the final boolean), leaving
delete_dlq_consumer failure from affecting the overall return; refer to the
identifiers delete_consumer, delete_stream and delete_dlq_consumer in
nats_queue.py.
- Line 389: The pull_subscribe call using durable=dlq_consumer_name is missing
the stream parameter, so ensure the durable consumer is created on the correct
stream by adding stream="advisories" to the self.js.pull_subscribe call (the
call that assigns psub). Update the invocation of
self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) to include
stream="advisories" so the durable consumer is associated with the "advisories"
stream.
- Around line 361-435: The review notes to change the error logging in
get_dead_letter_task_ids to include a stack trace: replace the logger.error call
in the outer exception handler with logger.exception (on the same logger) so the
exception context/traceback is automatically logged instead of manually
formatting the exception string; keep the same message text but remove the
manual "{e}" formatting when calling logger.exception to avoid duplication.
---
Nitpick comments:
In `@ami/ml/management/commands/check_dead_letter_queue.py`:
- Around line 21-26: The management command add_arguments should be extended to
accept optional --count/-n (int) and --all (store_true) flags, make them
mutually exclusive or validate in handle, and pass the chosen value into
get_dead_letter_task_ids when invoked in handle; specifically, update
add_arguments to add parser.add_argument('--count', '-n', type=int, default=10,
help=...) and parser.add_argument('--all', action='store_true', help=...), then
in handle read options['count'] and options['all'], validate that both aren't
set, convert --all into an appropriate sentinel (e.g., None or a high limit) and
call get_dead_letter_task_ids(job_id, count=...) or
get_dead_letter_task_ids(job_id, fetch_all=True) depending on that function's
signature, and update help text/usage accordingly so operators can request more
or all DLQ entries.
- Around line 43-44: The except block in check_dead_letter_queue.py currently
re-raises a CommandError without preserving the original traceback; update the
handler inside the Command class (the handle method where you catch "except
Exception as e") to re-raise the CommandError using "raise CommandError(... )
from e" so the original exception chain is preserved; keep the existing message
string but attach the original exception as the cause by using "from e" with the
CommandError symbol.
In `@ami/ml/orchestration/nats_queue.py`:
- Around line 437-460: In delete_dlq_consumer, handle the "consumer not found"
case separately: when calling self.js.delete_consumer("advisories",
dlq_consumer_name) catch the NotFoundError (the specific JetStream not-found
exception class) and log it at debug level instead of warning, while keeping
other exceptions falling back to the existing warning and returning False;
update references to js.delete_consumer, dlq_consumer_name, and the
logger.warning call to implement the separate except NotFoundError:
logger.debug(...) branch.
In `@ami/ml/orchestration/tests/test_nats_queue.py`:
- Around line 149-160: Add dedicated unit tests for the new DLQ-related methods:
create tests that call get_dead_letter_task_ids (mocking js.api.get_msg and
simulating messages with valid/invalid JSON and missing task_id) to assert it
returns correct IDs and handles parsing errors; test delete_dlq_consumer by
mocking js.delete_consumer to verify it is called with the DLQ consumer name and
that errors are handled/propagated as expected; and test _setup_advisory_stream
by mocking js.add_stream/js.add_consumer (or js.streams.add/js.consumers.add
depending on usage) to verify stream/consumer creation logic and error paths.
Use the same pattern as existing tests (patch get_connection to return mocked
nc, js) and assert call counts and returned values for get_dead_letter_task_ids,
delete_dlq_consumer, and _setup_advisory_stream.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5c3ca6a5-cb22-4bb1-bfc5-0c9809717a0b
📒 Files selected for processing (3)
ami/ml/management/commands/check_dead_letter_queue.pyami/ml/orchestration/nats_queue.pyami/ml/orchestration/tests/test_nats_queue.py
| # Use a durable consumer so ACKs persist across calls — ephemeral consumers | ||
| # are deleted on unsubscribe, discarding all ACK tracking and causing every | ||
| # advisory to be re-delivered on the next call. | ||
| psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) |
There was a problem hiding this comment.
Missing stream parameter for durable consumer creation.
When calling pull_subscribe with a durable name, you should also specify the stream parameter to ensure the consumer is created on the correct stream ("advisories"). Without it, the library may fail to locate or create the consumer correctly.
🔧 Proposed fix
- psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)
+ psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name, stream="advisories")📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name) | |
| psub = await self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name, stream="advisories") |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/ml/orchestration/nats_queue.py` at line 389, The pull_subscribe call
using durable=dlq_consumer_name is missing the stream parameter, so ensure the
durable consumer is created on the correct stream by adding stream="advisories"
to the self.js.pull_subscribe call (the call that assigns psub). Update the
invocation of self.js.pull_subscribe(subject_filter, durable=dlq_consumer_name)
to include stream="advisories" so the durable consumer is associated with the
"advisories" stream.
| # Delete consumer first, then stream, then the durable DLQ advisory consumer | ||
| consumer_deleted = await self.delete_consumer(job_id) | ||
| stream_deleted = await self.delete_stream(job_id) | ||
| dlq_consumer_deleted = await self.delete_dlq_consumer(job_id) | ||
|
|
||
| return consumer_deleted and stream_deleted | ||
| return consumer_deleted and stream_deleted and dlq_consumer_deleted |
There was a problem hiding this comment.
DLQ consumer deletion failure now causes overall cleanup to return False.
Per context snippet 2, cleanup_async_job_resources in jobs.py logs "Failed to clean up NATS resources" when cleanup_job_resources returns False. With this change, if the DLQ consumer deletion fails (e.g., consumer never existed because get_dead_letter_task_ids was never called), the entire cleanup will be reported as failed even though the main consumer and stream were successfully deleted.
Consider whether DLQ consumer deletion should be best-effort (not affecting the return value) since the DLQ consumer may not exist for jobs that completed successfully without any max-delivery failures.
💡 Alternative: Make DLQ deletion best-effort
consumer_deleted = await self.delete_consumer(job_id)
stream_deleted = await self.delete_stream(job_id)
- dlq_consumer_deleted = await self.delete_dlq_consumer(job_id)
+ # DLQ consumer may not exist if no tasks exceeded max deliveries
+ await self.delete_dlq_consumer(job_id) # Best-effort, don't affect return
- return consumer_deleted and stream_deleted and dlq_consumer_deleted
+ return consumer_deleted and stream_deleted📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| # Delete consumer first, then stream, then the durable DLQ advisory consumer | |
| consumer_deleted = await self.delete_consumer(job_id) | |
| stream_deleted = await self.delete_stream(job_id) | |
| dlq_consumer_deleted = await self.delete_dlq_consumer(job_id) | |
| return consumer_deleted and stream_deleted | |
| return consumer_deleted and stream_deleted and dlq_consumer_deleted | |
| # Delete consumer first, then stream, then the durable DLQ advisory consumer | |
| consumer_deleted = await self.delete_consumer(job_id) | |
| stream_deleted = await self.delete_stream(job_id) | |
| # DLQ consumer may not exist if no tasks exceeded max deliveries | |
| await self.delete_dlq_consumer(job_id) # Best-effort, don't affect return | |
| return consumer_deleted and stream_deleted |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@ami/ml/orchestration/nats_queue.py` around lines 474 - 479, The current
cleanup aggregates delete_consumer, delete_stream and delete_dlq_consumer
results so a DLQ deletion failure makes the whole cleanup fail; change this so
delete_dlq_consumer is attempted but treated best-effort: keep calling await
self.delete_dlq_consumer(job_id) (and optionally log a warning on failure) but
return only the combined result of await self.delete_consumer(job_id) and await
self.delete_stream(job_id) (i.e., use consumer_deleted and stream_deleted for
the final boolean), leaving delete_dlq_consumer failure from affecting the
overall return; refer to the identifiers delete_consumer, delete_stream and
delete_dlq_consumer in nats_queue.py.
Summary
This pull request introduces dead letter queue (DLQ) support for job tasks, allowing for better tracking and cleanup of failed tasks in the NATS orchestration layer.
Helps address #1168
Dead Letter Queue Management
check_dead_letter_queue.pyto allow users to check for dead letter queue messages for a specific job via the command line.get_dead_letter_task_idsinTaskQueueManagerto retrieve task IDs that failed after max delivery attempts, using a durable advisory consumer for persistence._setup_advisory_streamto ensure advisory streams are created when a connection is opened, capturing max-delivery events for DLQ tracking.Reliability Improvements
ACKs) for tasks and advisory messages are flushed to the NATS socket to avoid silent drops when subscriptions are torn down.How to Test the Changes
TASK_TTRinnats_queue.pyto a small value (e.g. 10 seconds) for easier testing. Also changemax_deliver=2.python trapdata/antenna/benchmark.py --job-id <job id> --skip-acks. This will drain the tasks with ACKs, similar to a crashing worker. Wait ~10 seconds and run it again.docker compose run --rm django python manage.py check_dead_letter_queue <job_id>Example output:
Once #1025 is in place, we can use this method to periodically check for failed images and then log to the job something like
Workers have failed to process Image 123 5 times, giving up, maybe only up to a max number of errors.Summary by CodeRabbit
New Features
Tests