Skip to content

Comments

[INLONG-12073][Manager] Optimize Pulsar message query with thread pool and proper error handling#12074

Merged
healchow merged 8 commits intoapache:masterfrom
healchow:heal-12703
Feb 25, 2026
Merged

[INLONG-12073][Manager] Optimize Pulsar message query with thread pool and proper error handling#12074
healchow merged 8 commits intoapache:masterfrom
healchow:heal-12703

Conversation

@healchow
Copy link
Member

@healchow healchow commented Feb 16, 2026

[INLONG-12073][Manager] Optimize Pulsar message query with thread pool and proper error handling

Fixes #12073

Motivation

Currently, when querying MQ messages from multiple Pulsar clusters, there are several issues:

  1. No thread pool management: Message queries are executed without proper thread pool management, which may lead to resource exhaustion under high concurrency.

  2. No graceful handling for task rejection: When too many concurrent requests come in, the system doesn't properly handle the RejectedExecutionException and doesn't provide a user-friendly error response.

  3. No task cancellation mechanism: When task submission fails, previously submitted tasks continue to run unnecessarily.

  4. No interruption support: Long-running IO operations cannot be cancelled when the request is aborted.

Modifications

  1. Add a dedicated thread pool: Use ThreadPoolTaskExecutor with configurable core/max pool size and queue capacity for message query tasks.

  2. Implement task cancellation: When RejectedExecutionException occurs, cancel all previously submitted tasks to free up resources.

  3. Add interruption checks: Check Thread.currentThread().isInterrupted() before and after IO operations to support task cancellation.

Verifying this change

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

    • Added unit tests QueryLatestMessagesRunnableTest to verified this change

Documentation

  • Does this pull request introduce a new feature? (yes / no)
  • If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented)
  • If a feature is not applicable for documentation, explain why?
  • If a feature is not documented yet in this PR, please create a follow-up issue for adding the documentation

@healchow healchow requested review from fuweng11 and luchunliang and removed request for fuweng11 February 16, 2026 05:49
@healchow healchow self-assigned this Feb 16, 2026
@healchow healchow force-pushed the heal-12703 branch 2 times, most recently from 80bb259 to 57cd7f4 Compare February 24, 2026 11:52
@healchow healchow force-pushed the heal-12703 branch 2 times, most recently from 98382c6 to 7322cde Compare February 25, 2026 05:04
fuweng11
fuweng11 previously approved these changes Feb 25, 2026
luchunliang
luchunliang previously approved these changes Feb 25, 2026
vernedeng
vernedeng previously approved these changes Feb 25, 2026
@healchow healchow dismissed stale reviews from vernedeng, luchunliang, and fuweng11 via 2bc74b8 February 25, 2026 06:50
@healchow healchow merged commit b2876ad into apache:master Feb 25, 2026
8 checks passed
@healchow healchow deleted the heal-12703 branch February 25, 2026 07:29
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug][Manager] No proper error handling when MQ message query thread pool is exhausted

4 participants