feat(coordinator): limit parallelism during chunk assignment#1797
feat(coordinator): limit parallelism during chunk assignment#1797
Conversation
📝 WalkthroughWalkthroughThis PR bumps the version tag from v4.7.12 to v4.7.13 in the version file and introduces a global throttle mechanism with a maximum parallelism of 2 for debug_executionWitness processing in the prover task coordinator, using semaphores to limit concurrent universal-witness operations across two code paths. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~12 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (2)
coordinator/internal/logic/provertask/chunk_prover_task.go (2)
27-32: Consider making universal parallelism configurable.Hard-coding
2at package scope makes tuning production behavior require code changes. A config-backed value (with sane default) would be easier to operate under varying prover capacity and API latency.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/internal/logic/provertask/chunk_prover_task.go` around lines 27 - 32, The package currently hard-codes applyUniversalMaxParallelism = 2 and constructs witnessSemaphore at package init; make this configurable by reading a config/env/flag value at startup and initializing applyUniversalMaxParallelism and witnessSemaphore there instead of at package scope. Specifically: add a config key (or env var/flag) for universal max parallelism with a sane default, validate it (ensure >=1), and create witnessSemaphore = make(chan struct{}, max) during application initialization (e.g., in an init hook or the coordinator startup/new constructor) so the semaphore capacity reflects the configured value; update any code that references applyUniversalMaxParallelism to use the configured variable.
212-215: Release the semaphore immediately afterapplyUniversalcompletes.Line 215 defers release until
Assignreturns, so post-processing (DB insert/metrics/logging) still occupies a slot. Scope the acquire/release aroundcp.applyUniversalonly to keep throughput predictable.💡 Proposed refactor
if getTaskParameter.Universal { var metadata []byte - - select { - case witnessSemaphore <- struct{}{}: - // Released when Assign returns (defer). - defer func() { <-witnessSemaphore }() - case <-ctx.Done(): - log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err()) - cp.recoverActiveAttempts(ctx, chunkTask) - return nil, ctx.Err() - } - - taskMsg, metadata, err = cp.applyUniversal(taskMsg) + taskMsg, metadata, err = func() (*coordinatorType.GetTaskSchema, []byte, error) { + select { + case witnessSemaphore <- struct{}{}: + defer func() { <-witnessSemaphore }() + case <-ctx.Done(): + log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err()) + if taskCtx.hasAssignedTask == nil { + cp.recoverActiveAttempts(ctx, chunkTask) + } + return nil, nil, ctx.Err() + } + return cp.applyUniversal(taskMsg) + }() if err != nil { cp.recoverActiveAttempts(ctx, chunkTask) log.Error("Generate universal prover task failure", "task_id", chunkTask.Hash, "type", "chunk", "err", err) return nil, ErrCoordinatorInternalFailure } proverTask.Metadata = metadata }Also applies to: 222-223
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/internal/logic/provertask/chunk_prover_task.go` around lines 212 - 215, The semaphore acquire around the witness work currently holds the slot until Assign returns because the release is deferred; change the scope so witnessSemaphore is acquired before calling cp.applyUniversal and released immediately after cp.applyUniversal completes (before any DB inserts/metrics/logging or before calling Assign) instead of deferring until Assign returns; update both places that acquire witnessSemaphore (the block around cp.applyUniversal and the similar block later) so the defer is removed and a direct <-witnessSemaphore is executed right after cp.applyUniversal returns to limit semaphore occupancy to only the applyUniversal duration.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@coordinator/internal/logic/provertask/chunk_prover_task.go`:
- Around line 217-219: The early return on context cancellation is
unconditionally calling cp.recoverActiveAttempts, which must only run when this
request performed the increment (the increment happens in the
taskCtx.hasAssignedTask == nil path). Update the context-cancel branch in
chunk_prover_task.go to check taskCtx.hasAssignedTask == nil (or another flag
that indicates this request incremented attempts) before calling
cp.recoverActiveAttempts(ctx, chunkTask) and before returning; leave the return
of ctx.Err() unchanged when no increment occurred. Ensure the guard uses the
same condition used where attempts are incremented so accounting stays
consistent.
---
Nitpick comments:
In `@coordinator/internal/logic/provertask/chunk_prover_task.go`:
- Around line 27-32: The package currently hard-codes
applyUniversalMaxParallelism = 2 and constructs witnessSemaphore at package
init; make this configurable by reading a config/env/flag value at startup and
initializing applyUniversalMaxParallelism and witnessSemaphore there instead of
at package scope. Specifically: add a config key (or env var/flag) for universal
max parallelism with a sane default, validate it (ensure >=1), and create
witnessSemaphore = make(chan struct{}, max) during application initialization
(e.g., in an init hook or the coordinator startup/new constructor) so the
semaphore capacity reflects the configured value; update any code that
references applyUniversalMaxParallelism to use the configured variable.
- Around line 212-215: The semaphore acquire around the witness work currently
holds the slot until Assign returns because the release is deferred; change the
scope so witnessSemaphore is acquired before calling cp.applyUniversal and
released immediately after cp.applyUniversal completes (before any DB
inserts/metrics/logging or before calling Assign) instead of deferring until
Assign returns; update both places that acquire witnessSemaphore (the block
around cp.applyUniversal and the similar block later) so the defer is removed
and a direct <-witnessSemaphore is executed right after cp.applyUniversal
returns to limit semaphore occupancy to only the applyUniversal duration.
| log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err()) | ||
| cp.recoverActiveAttempts(ctx, chunkTask) | ||
| return nil, ctx.Err() |
There was a problem hiding this comment.
Guard active-attempt recovery to only run when this request incremented attempts.
Line 218 decrements active attempts unconditionally, but increments only happen in the taskCtx.hasAssignedTask == nil path. This can corrupt attempt accounting for already-assigned tasks.
💡 Proposed fix
case <-ctx.Done():
log.Warn("context canceled waiting for witness semaphore", "task_id", chunkTask.Hash, "err", ctx.Err())
- cp.recoverActiveAttempts(ctx, chunkTask)
+ if taskCtx.hasAssignedTask == nil {
+ cp.recoverActiveAttempts(ctx, chunkTask)
+ }
return nil, ctx.Err()
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/internal/logic/provertask/chunk_prover_task.go` around lines 217
- 219, The early return on context cancellation is unconditionally calling
cp.recoverActiveAttempts, which must only run when this request performed the
increment (the increment happens in the taskCtx.hasAssignedTask == nil path).
Update the context-cancel branch in chunk_prover_task.go to check
taskCtx.hasAssignedTask == nil (or another flag that indicates this request
incremented attempts) before calling cp.recoverActiveAttempts(ctx, chunkTask)
and before returning; leave the return of ctx.Err() unchanged when no increment
occurred. Ensure the guard uses the same condition used where attempts are
incremented so accounting stays consistent.
There was a problem hiding this comment.
This seems to be a pre-existing issue on other code paths as well, might this be a problem in practice? @georgehao
There was a problem hiding this comment.
Seems like the humans are having a chat. I'll hop back into my burrow for now. If you need me again, just tag @coderabbitai in a new comment, and I'll come hopping out!
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #1797 +/- ##
========================================
Coverage 36.41% 36.42%
========================================
Files 248 248
Lines 21321 21330 +9
========================================
+ Hits 7765 7770 +5
- Misses 12733 12737 +4
Partials 823 823
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Purpose or design rationale of this PR
applyUniversalcallsdebug_executionWitness, which degrades significantly under concurrent load. When multiple chunk proof assignments arrive simultaneously, the unthrottled parallel calls cause severe slowdowns.This PR adds a semaphore (capacity 2) that limits concurrent
applyUniversalinvocations. Requests that arrive while both slots are occupied block until a slot opens or the request context is cancelled, preventing goroutine accumulation.PR title
Your PR title must follow conventional commits (as we are doing squash merge for each PR), so it must start with one of the following types:
Deployment tag versioning
Has
tagincommon/version.gobeen updated or have you addedbump-versionlabel to this PR?Breaking change label
Does this PR have the
breaking-changelabel?Summary by CodeRabbit
Bug Fixes
Chores