ref(inc-2150): handle future TimeoutError#7919
Conversation
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 3dab492. Configure here.
|
|
||
| transformed_message = self.__result_encoder.encode( | ||
| SubscriptionTaskResult(result_future.task, result_future.future.result(remaining)) | ||
| SubscriptionTaskResult(result_future.task, result) |
There was a problem hiding this comment.
Shutdown blocks indefinitely after catching future timeout
Medium Severity
Catching FutureTimeoutError and breaking out of the loop creates a new code path where self.__executor.shutdown() (with default wait=True) is reached after a future has timed out. Previously, the TimeoutError would propagate and skip shutdown() entirely. Now, shutdown() blocks until the still-running timed-out future completes — which can far exceed the 5-second join_timeout, effectively turning "crash on timeout" into "hang on timeout."
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 3dab492. Configure here.


When closing a strategy we call
strategy.joinhttps://github.com/getsentry/arroyo/blob/3cb0b65d110eb9aa5efb6db88820792e6b3542ab/arroyo/processing/processor.py#L296 which for the subscriptions tries to finishes the subscriptions tasks. We havejointimeout which handles dropping tasks left in the queue, but if an individual task timeout the consumer crashes.This PR avoids crashing the consumer on shutdown when a future raises a
TimeoutError.