-
Notifications
You must be signed in to change notification settings - Fork 11
Support for NATS dead letter queue #1175
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
carlosgjs
wants to merge
24
commits into
RolnickLab:main
Choose a base branch
from
uw-ssec:carlos/natsdlq
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
24 commits
Select commit
Hold shift + click to select a range
b60eab0
merge
carlos-irreverentlabs 644927f
Merge remote-tracking branch 'upstream/main'
carlosgjs 218f7aa
Merge remote-tracking branch 'upstream/main'
carlosgjs 90da389
Merge remote-tracking branch 'upstream/main'
carlosgjs 8618d3c
Merge remote-tracking branch 'upstream/main'
carlosgjs bd1be5f
Merge remote-tracking branch 'upstream/main'
carlosgjs b102ae1
Merge remote-tracking branch 'upstream/main'
carlosgjs bc908aa
fix: PSv2 follow-up fixes from integration tests (#1135)
mihow 4c3802a
PSv2: Improve task fetching & web worker concurrency configuration (#…
carlosgjs b717e80
fix: include pipeline_slug in MinimalJobSerializer (#1148)
mihow 883c4f8
Merge remote-tracking branch 'upstream/main'
carlosgjs e26f3c6
Merge remote-tracking branch 'upstream/main'
carlosgjs 4ef7a24
Merge branch 'RolnickLab:main' into main
mihow c389e90
Merge remote-tracking branch 'upstream/main'
carlosgjs 33a6425
Merge branch 'main' of github.com:uw-ssec/antenna
carlosgjs bf80824
Merge remote-tracking branch 'upstream/main'
carlosgjs a2e68a0
WIP: Add support for NATS dead-letter-queue
carlosgjs db05526
Update tests
carlosgjs 602f825
Add tests, cleanup naming and error handling
carlosgjs 0102ee7
More CR feedback
carlosgjs b44f5b0
Use constant
carlosgjs 5c3a47b
CR
carlosgjs e09fd9a
let exception propagate
carlosgjs e4564fb
Use async_to_sync
carlosgjs File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,48 @@ | ||
| """ | ||
| Management command to check dead letter queue messages for a job. | ||
|
|
||
| Usage: | ||
| python manage.py check_dead_letter_queue <job_id> | ||
|
|
||
| Example: | ||
| python manage.py check_dead_letter_queue 123 | ||
| """ | ||
|
|
||
| from asgiref.sync import async_to_sync | ||
| from django.core.management.base import BaseCommand, CommandError | ||
|
|
||
| from ami.ml.orchestration.nats_queue import TaskQueueManager | ||
|
|
||
|
|
||
| class Command(BaseCommand): | ||
| help = "Check dead letter queue messages for a job ID" | ||
|
|
||
| def add_arguments(self, parser): | ||
| parser.add_argument( | ||
| "job_id", | ||
| type=int, | ||
| help="Job ID to check for dead letter queue messages", | ||
| ) | ||
|
|
||
| def handle(self, *args, **options): | ||
| job_id = options["job_id"] | ||
|
|
||
| try: | ||
| dead_letter_ids = async_to_sync(self._check_dead_letter_queue)(job_id) | ||
|
|
||
| if dead_letter_ids: | ||
| self.stdout.write( | ||
| self.style.WARNING(f"Found {len(dead_letter_ids)} dead letter image(s) for job {job_id}:") | ||
| ) | ||
| for image_id in dead_letter_ids: | ||
| self.stdout.write(f" - Image ID: {image_id}") | ||
| else: | ||
| self.stdout.write(self.style.SUCCESS(f"No dead letter images found for job {job_id}")) | ||
|
|
||
| except Exception as e: | ||
| raise CommandError(f"Failed to check dead letter queue: {e}") | ||
|
|
||
| async def _check_dead_letter_queue(self, job_id: int) -> list[str]: | ||
| """Check for dead letter queue messages using TaskQueueManager.""" | ||
| async with TaskQueueManager() as manager: | ||
| return await manager.get_dead_letter_image_ids(job_id) | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.