diff --git a/inlong-manager/manager-schedule/pom.xml b/inlong-manager/manager-schedule/pom.xml index b975dedbaf4..444d314ab4f 100644 --- a/inlong-manager/manager-schedule/pom.xml +++ b/inlong-manager/manager-schedule/pom.xml @@ -25,6 +25,7 @@ manager-schedule + Apache InLong - Manager Schedule 2.3.2 diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java index 528884c9968..389cb241675 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/QueueResourceOperator.java @@ -83,11 +83,10 @@ default void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo st * @param groupInfo inlong group info * @param streamInfo inlong stream info * @param request query message request - * @throws Exception any exception if occurred * @return query brief mq message info */ default List queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - QueryMessageRequest request) throws Exception { + QueryMessageRequest request) { return null; } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java index 67a1b39f7a5..66cbee4fc38 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/PulsarQueueResourceOperator.java @@ -44,17 +44,26 @@ import org.apache.inlong.manager.service.stream.InlongStreamService; import com.google.common.base.Objects; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import lombok.extern.slf4j.Slf4j; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang3.StringUtils; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; + import java.util.ArrayList; import java.util.Collections; import java.util.List; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** @@ -80,7 +89,59 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator { @Autowired private PulsarOperator pulsarOperator; - private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + @Value("${pulsar.query.poolSize:10}") + private int poolSize; + + @Value("${pulsar.query.keepAliveSeconds:60}") + private long keepAliveSeconds; + + @Value("${pulsar.query.queueCapacity:100}") + private int queueCapacity; + + @Value("${pulsar.query.queryTimeoutSeconds:10}") + private int queryTimeoutSeconds; + + /** + * Thread pool for querying messages from multiple Pulsar clusters concurrently. + * Configuration is loaded from application properties with prefix 'pulsar.query'. + */ + private ExecutorService messageQueryExecutor; + + /** + * Initialize the executor service after bean creation. + */ + @PostConstruct + public void init() { + // Initialize the executor service with same core pool size and max core pool size + this.messageQueryExecutor = new ThreadPoolExecutor( + poolSize, + poolSize, + keepAliveSeconds, + TimeUnit.SECONDS, + new LinkedBlockingQueue<>(queueCapacity), + new ThreadFactoryBuilder().setNameFormat("pulsar-message-query-%d").build(), + // Use AbortPolicy to throw exception when the queue is full + new ThreadPoolExecutor.AbortPolicy()); + log.info("Init message query executor, poolSize={}, keepAliveSeconds={}, queueCapacity={}", + poolSize, keepAliveSeconds, queueCapacity); + } + + /** + * Shutdown the executor service when the bean is destroyed. + */ + @PreDestroy + public void shutdown() { + log.info("Shutting down pulsar message query executor"); + messageQueryExecutor.shutdown(); + try { + if (!messageQueryExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + messageQueryExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + messageQueryExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } @Override public boolean accept(String mqType) { @@ -307,34 +368,117 @@ private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pu * Query latest message from pulsar */ public List queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo, - QueryMessageRequest request) throws Exception { - List pulsarClusterList = clusterService.listByTagAndType(groupInfo.getInlongClusterTag(), - ClusterType.PULSAR); - List briefMQMessages = Collections.synchronizedList(new ArrayList<>()); - QueryCountDownLatch queryLatch = new QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size()); + QueryMessageRequest request) { + String groupId = streamInfo.getInlongGroupId(); + String clusterTag = groupInfo.getInlongClusterTag(); + List clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR); + if (CollectionUtils.isEmpty(clusterInfos)) { + log.warn("No pulsar cluster found for clusterTag={} for groupId={}", clusterTag, groupId); + return Collections.emptyList(); + } + + // Select clusters and calculate per-cluster query count + Integer requestCount = request.getMessageCount(); + int clusterSize = clusterInfos.size(); + QueryCountDownLatch queryLatch = new QueryCountDownLatch(requestCount, clusterSize); + log.debug("Query pulsar message in {} clusters, each cluster query {} messages", clusterSize, requestCount); + + // Extract common parameters InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo); - for (ClusterInfo clusterInfo : pulsarClusterList) { - QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo, - (PulsarClusterInfo) clusterInfo, pulsarOperator, request, briefMQMessages, queryLatch); - this.executor.execute(task); + String tenant = inlongPulsarInfo.getPulsarTenant(); + String namespace = inlongPulsarInfo.getMqResource(); + String topicName = streamInfo.getMqResource(); + boolean serialQueue = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule()); + + // Submit query tasks to thread pool, each task queries from one cluster + // Use submit() instead of execute() to get Future for cancellation support + List> submittedTasks = new ArrayList<>(); + // Use ConcurrentLinkedQueue for thread-safe message collection, + // its performance is better than Collections.synchronizedList + ConcurrentLinkedQueue messageResultQueue = new ConcurrentLinkedQueue<>(); + for (ClusterInfo clusterInfo : clusterInfos) { + PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo; + if (StringUtils.isBlank(tenant)) { + tenant = pulsarCluster.getPulsarTenant(); + } + String fullTopicName = tenant + "/" + namespace + "/" + topicName; + // Create a copy of request with adjusted message count for this cluster + QueryMessageRequest currentRequest = buildRequestForSingleCluster(request, requestCount); + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(pulsarOperator, streamInfo, + pulsarCluster, serialQueue, fullTopicName, currentRequest, messageResultQueue, queryLatch); + try { + Future future = this.messageQueryExecutor.submit(task); + submittedTasks.add(future); + } catch (RejectedExecutionException e) { + // Cancel all previously submitted tasks before throwing exception + log.error("Failed to submit query task for groupId={}, cancelling {} submitted tasks", + groupId, submittedTasks.size(), e); + cancelSubmittedTasks(submittedTasks); + throw new BusinessException("Query messages task rejected: too many concurrent requests"); + } + } + + // Wait for tasks to complete with a configurable timeout + String streamId = streamInfo.getInlongStreamId(); + try { + boolean completed = queryLatch.await(queryTimeoutSeconds, TimeUnit.SECONDS); + if (!completed) { + log.warn("Query messages timeout for groupId={}, streamId={}, collected {} messages", + groupId, streamId, messageResultQueue.size()); + } + } catch (InterruptedException e) { + throw new BusinessException(String.format("Query messages task interrupted for groupId=%s, streamId=%s", + groupId, streamId)); + } + + log.info("Success query pulsar message for groupId={}, streamId={}", groupId, streamId); + List messageResultList = new ArrayList<>(messageResultQueue); + + // if query result size is less than request count, return all, otherwise truncate to request count + if (messageResultList.isEmpty() || messageResultList.size() <= requestCount) { + return messageResultList; } - queryLatch.await(30, TimeUnit.SECONDS); - log.info("success query pulsar message for groupId={}, streamId={}", streamInfo.getInlongGroupId(), - streamInfo.getInlongStreamId()); - - int finalMsgCount = Math.min(request.getMessageCount(), briefMQMessages.size()); - if (finalMsgCount > 0) { - return new ArrayList<>(briefMQMessages.subList(0, finalMsgCount)); - } else { - return new ArrayList<>(); + + return new ArrayList<>(messageResultList.subList(0, requestCount)); + } + + /** + * Build a new QueryMessageRequest with adjusted message count for a specific cluster. + */ + private QueryMessageRequest buildRequestForSingleCluster(QueryMessageRequest original, int messageCount) { + return QueryMessageRequest.builder() + .groupId(original.getGroupId()) + .streamId(original.getStreamId()) + .messageCount(messageCount) + .fieldName(original.getFieldName()) + .operationType(original.getOperationType()) + .targetValue(original.getTargetValue()) + .build(); + } + + /** + * Cancel all submitted tasks when an error occurs. + * This method attempts to cancel tasks with interrupt flag set to true, + * allowing running tasks to be interrupted if they check for interruption. + * + * @param submittedTasks list of Future objects representing submitted tasks + */ + private void cancelSubmittedTasks(List> submittedTasks) { + int cancelledCount = 0; + for (java.util.concurrent.Future future : submittedTasks) { + // mayInterruptIfRunning=true allows interrupting running tasks + if (future.cancel(true)) { + cancelledCount++; + } } + log.info("Cancelled {}/{} submitted tasks", cancelledCount, submittedTasks.size()); } /** * Reset cursor for consumer group */ public void resetCursor(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity, StreamSinkEntity sinkEntity, - Long resetTime) throws Exception { + Long resetTime) { log.info("begin to reset cursor for sinkId={}", sinkEntity.getId()); InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo; List clusterInfos = clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(), diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java index 08970b39d74..a76dc14f327 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryCountDownLatch.java @@ -21,13 +21,20 @@ import java.util.concurrent.TimeUnit; /** - * QueryCountDownLatch + * QueryCountDownLatch for managing query task and data completion. + *

+ * This class provides two types of countdown: + *

    + *
  • Task countdown: tracks the number of tasks completed (regardless of success or failure)
  • + *
  • Data countdown: tracks the number of data items retrieved
  • + *
+ * The flagLatch is released when either all tasks complete or enough data is collected. */ public class QueryCountDownLatch { - private CountDownLatch dataLatch; - private CountDownLatch taskLatch; - private CountDownLatch flagLatch; + private final CountDownLatch dataLatch; + private final CountDownLatch taskLatch; + private final CountDownLatch flagLatch; public QueryCountDownLatch(int dataSize, int taskSize) { this.dataLatch = new CountDownLatch(dataSize); @@ -35,12 +42,34 @@ public QueryCountDownLatch(int dataSize, int taskSize) { this.flagLatch = new CountDownLatch(1); } - public void countDown(int dataDownSize) { + /** + * Called when a task completes (regardless of success or failure). + * This should be called in a finally block to ensure it's always executed. + */ + public void taskCountDown() { this.taskLatch.countDown(); + checkAndRelease(); + } + + /** + * Called when data items are successfully retrieved. + * + * @param dataDownSize the number of data items retrieved + */ + public void dataCountDown(int dataDownSize) { for (int i = 0; i < dataDownSize; i++) { this.dataLatch.countDown(); } - if (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0) { + checkAndRelease(); + } + + /** + * Check if the flagLatch should be released. + * Release when all tasks complete or enough data is collected. + */ + private synchronized void checkAndRelease() { + if (this.flagLatch.getCount() > 0 + && (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0)) { this.flagLatch.countDown(); } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java index 4fb6b58e492..37eabf65f3e 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/queue/pulsar/QueryLatestMessagesRunnable.java @@ -17,63 +17,89 @@ package org.apache.inlong.manager.service.resource.queue.pulsar; -import org.apache.inlong.manager.common.consts.InlongConstants; import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; import org.apache.inlong.manager.pojo.consume.BriefMQMessage; -import org.apache.inlong.manager.pojo.group.pulsar.InlongPulsarInfo; import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; /** - * QueryLatestMessagesRunnable + * Runnable task for querying latest messages from a Pulsar cluster. */ public class QueryLatestMessagesRunnable implements Runnable { - private InlongPulsarInfo inlongPulsarInfo; - private InlongStreamInfo streamInfo; - private PulsarClusterInfo clusterInfo; - private PulsarOperator pulsarOperator; - private QueryMessageRequest queryMessageRequest; - private List briefMQMessages; - private QueryCountDownLatch latch; + private static final Logger LOG = LoggerFactory.getLogger(QueryLatestMessagesRunnable.class); - public QueryLatestMessagesRunnable(InlongPulsarInfo inlongPulsarInfo, + private final PulsarOperator pulsarOperator; + private final InlongStreamInfo streamInfo; + private final PulsarClusterInfo clusterInfo; + private final boolean serialQueue; + private final String fullTopicName; + private final QueryMessageRequest queryMessageRequest; + private final ConcurrentLinkedQueue messageResultQueue; + private final QueryCountDownLatch latch; + + public QueryLatestMessagesRunnable( + PulsarOperator pulsarOperator, InlongStreamInfo streamInfo, PulsarClusterInfo clusterInfo, - PulsarOperator pulsarOperator, + boolean serialQueue, + String fullTopicName, QueryMessageRequest queryMessageRequest, - List briefMQMessages, + ConcurrentLinkedQueue messageResultQueue, QueryCountDownLatch latch) { - this.inlongPulsarInfo = inlongPulsarInfo; + this.pulsarOperator = pulsarOperator; this.streamInfo = streamInfo; this.clusterInfo = clusterInfo; - this.pulsarOperator = pulsarOperator; + this.serialQueue = serialQueue; + this.fullTopicName = fullTopicName; this.queryMessageRequest = queryMessageRequest; - this.briefMQMessages = briefMQMessages; + this.messageResultQueue = messageResultQueue; this.latch = latch; } @Override public void run() { - String tenant = inlongPulsarInfo.getPulsarTenant(); - if (StringUtils.isBlank(tenant)) { - tenant = clusterInfo.getPulsarTenant(); - } + String clusterName = clusterInfo.getName(); + LOG.debug("Begin to query messages from cluster={}, topic={}", clusterName, fullTopicName); + try { + // Check for interruption before starting the query + if (Thread.currentThread().isInterrupted()) { + LOG.info("Task interrupted before query, cluster={}, topic={}", clusterName, fullTopicName); + return; + } + + List messages = pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, + queryMessageRequest, streamInfo, serialQueue); + + // Check for interruption after query completes + // (IO operations not support interruption, so we check the flag manually after the blocking call returns) + if (Thread.currentThread().isInterrupted()) { + LOG.info("Task interrupted after query, discarding results, cluster={}, topic={}", + clusterName, fullTopicName); + return; + } - String namespace = inlongPulsarInfo.getMqResource(); - String topicName = streamInfo.getMqResource(); - String fullTopicName = tenant + "/" + namespace + "/" + topicName; - boolean serial = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule()); - List messages = - pulsarOperator.queryLatestMessage(clusterInfo, fullTopicName, queryMessageRequest, streamInfo, serial); - if (CollectionUtils.isNotEmpty(messages)) { - briefMQMessages.addAll(messages); - this.latch.countDown(messages.size()); + if (CollectionUtils.isNotEmpty(messages)) { + messageResultQueue.addAll(messages); + this.latch.dataCountDown(messages.size()); + LOG.debug("Successfully queried {} messages from cluster={}, topic={}", + messages.size(), clusterName, fullTopicName); + } else { + LOG.debug("No messages found from cluster={}, topic={}", clusterName, fullTopicName); + } + } catch (Exception e) { + LOG.error("Failed to query messages from cluster={}, groupId={}, streamId={}", + clusterName, queryMessageRequest.getGroupId(), queryMessageRequest.getStreamId(), e); + } finally { + // Ensure taskCountDown is always called, regardless of success or failure + this.latch.taskCountDown(); } } } diff --git a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java index 46f8fe5e946..8b11651d2af 100644 --- a/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java +++ b/inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/stream/InlongStreamServiceImpl.java @@ -981,13 +981,8 @@ public List listMessages(QueryMessageRequest request, String ope InlongGroupOperator instance = groupOperatorFactory.getInstance(groupEntity.getMqType()); InlongGroupInfo groupInfo = instance.getFromEntity(groupEntity); InlongStreamInfo inlongStreamInfo = get(request.getGroupId(), request.getStreamId()); - List messageList = new ArrayList<>(); QueueResourceOperator queueOperator = queueOperatorFactory.getInstance(groupEntity.getMqType()); - try { - messageList = queueOperator.queryLatestMessages(groupInfo, inlongStreamInfo, request); - } catch (Exception e) { - LOGGER.error("query message error ", e); - } - return messageList; + // Do not catch exception, throws it to caller + return queueOperator.queryLatestMessages(groupInfo, inlongStreamInfo, request); } } diff --git a/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java new file mode 100644 index 00000000000..fe90083ba38 --- /dev/null +++ b/inlong-manager/manager-service/src/test/java/org/apache/inlong/manager/service/queue/QueryLatestMessagesRunnableTest.java @@ -0,0 +1,439 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.manager.service.queue; + +import org.apache.inlong.manager.common.exceptions.BusinessException; +import org.apache.inlong.manager.pojo.cluster.pulsar.PulsarClusterInfo; +import org.apache.inlong.manager.pojo.consume.BriefMQMessage; +import org.apache.inlong.manager.pojo.stream.InlongStreamInfo; +import org.apache.inlong.manager.pojo.stream.QueryMessageRequest; +import org.apache.inlong.manager.service.resource.queue.pulsar.PulsarOperator; +import org.apache.inlong.manager.service.resource.queue.pulsar.QueryCountDownLatch; +import org.apache.inlong.manager.service.resource.queue.pulsar.QueryLatestMessagesRunnable; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Test class for {@link QueryLatestMessagesRunnable}. + * Tests focus on interrupt handling behavior. + */ +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class QueryLatestMessagesRunnableTest { + + @Mock + private PulsarOperator pulsarOperator; + + @Mock + private InlongStreamInfo streamInfo; + + @Mock + private PulsarClusterInfo clusterInfo; + + private QueryMessageRequest queryMessageRequest; + private ConcurrentLinkedQueue messageResultQueue; + private QueryCountDownLatch queryLatch; + + private static final String CLUSTER_NAME = "test-cluster"; + private static final String FULL_TOPIC_NAME = "public/test-namespace/test-topic"; + private static final String GROUP_ID = "test-group"; + private static final String STREAM_ID = "test-stream"; + + @BeforeEach + public void setUp() { + when(clusterInfo.getName()).thenReturn(CLUSTER_NAME); + + queryMessageRequest = QueryMessageRequest.builder() + .groupId(GROUP_ID) + .streamId(STREAM_ID) + .messageCount(10) + .build(); + + messageResultQueue = new ConcurrentLinkedQueue<>(); + queryLatch = new QueryCountDownLatch(10, 1); + } + + /** + * Test: Task executes successfully when not interrupted. + * Verifies that messages are added to result list and latch is counted down. + */ + @Test + public void testSuccessfulQueryWithoutInterruption() { + // 准备 mock 返回数据 + List mockMessages = new ArrayList<>(); + BriefMQMessage message = new BriefMQMessage(); + message.setBody("test message"); + mockMessages.add(message); + + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenReturn(mockMessages); + + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + + // 同步执行任务 + task.run(); + + // 验证:查询被执行 + verify(pulsarOperator, times(1)).queryLatestMessage(any(), anyString(), any(), any(), anyBoolean()); + // 验证:结果被添加到列表 + assertEquals(1, messageResultQueue.size()); + } + + /** + * Test: Task exits immediately when thread is interrupted before query starts. + * Verifies that no query is performed and latch is still counted down. + */ + @Test + public void testInterruptionBeforeQuery() throws InterruptedException { + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + + // 创建一个在执行任务前就被中断的线程 + Thread testThread = new Thread(() -> { + // 在执行任务前中断线程 + Thread.currentThread().interrupt(); + task.run(); + }); + + testThread.start(); + testThread.join(5000); + + // 验证:查询未被执行(因为在查询前就检测到中断) + verify(pulsarOperator, never()).queryLatestMessage(any(), anyString(), any(), any(), anyBoolean()); + // 验证:结果列表为空 + assertTrue(messageResultQueue.isEmpty()); + } + + /** + * Test: Task discards results when interrupted after query completes. + * Verifies that results are not added to the list even if query returned data. + */ + @Test + public void testInterruptionAfterQuery() throws InterruptedException { + // 模拟查询操作会检查并设置中断标志 + List mockMessages = new ArrayList<>(); + BriefMQMessage message = new BriefMQMessage(); + message.setBody("test message"); + mockMessages.add(message); + + CountDownLatch queryStartedLatch = new CountDownLatch(1); + CountDownLatch interruptSetLatch = new CountDownLatch(1); + + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenAnswer(invocation -> { + // 通知主线程查询已开始 + queryStartedLatch.countDown(); + // 等待主线程设置中断标志 + interruptSetLatch.await(5, TimeUnit.SECONDS); + // 返回结果(模拟 IO 操作不响应中断) + return mockMessages; + }); + + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + + // 在线程池中执行任务 + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(task); + + // 等待查询开始 + assertTrue(queryStartedLatch.await(5, TimeUnit.SECONDS)); + + // 取消任务(设置中断标志) + future.cancel(true); + // 通知任务可以继续 + interruptSetLatch.countDown(); + + // 等待任务完成 + Thread.sleep(500); + + // 验证:查询被执行 + verify(pulsarOperator, times(1)).queryLatestMessage(any(), anyString(), any(), any(), anyBoolean()); + // 验证:结果被丢弃(因为查询后检测到中断标志) + assertTrue(messageResultQueue.isEmpty()); + + executor.shutdownNow(); + } + + /** + * Test: Task handles empty query results correctly. + * Verifies that no exception is thrown and latch is counted down. + */ + @Test + public void testEmptyQueryResults() { + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenReturn(Collections.emptyList()); + + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + + task.run(); + + // 验证:查询被执行 + verify(pulsarOperator, times(1)).queryLatestMessage(any(), anyString(), any(), any(), anyBoolean()); + // 验证:结果列表为空 + assertTrue(messageResultQueue.isEmpty()); + } + + /** + * Test: Task handles null query results correctly. + * Verifies that no exception is thrown and latch is counted down. + */ + @Test + public void testNullQueryResults() { + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenReturn(null); + + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + + task.run(); + + // 验证:查询被执行 + verify(pulsarOperator, times(1)).queryLatestMessage(any(), anyString(), any(), any(), anyBoolean()); + // 验证:结果列表为空 + assertTrue(messageResultQueue.isEmpty()); + } + + /** + * Test: Task handles query exception gracefully. + * Verifies that exception is caught and latch is still counted down. + */ + @Test + public void testQueryException() { + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenThrow(new RuntimeException("Simulated query failure")); + + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + + // 不应抛出异常 + task.run(); + + // 验证:查询被执行 + verify(pulsarOperator, times(1)).queryLatestMessage(any(), anyString(), any(), any(), anyBoolean()); + // 验证:结果列表为空 + assertTrue(messageResultQueue.isEmpty()); + } + + /** + * Test: Multiple tasks can be cancelled together. + * Simulates the scenario where RejectedExecutionException occurs and all submitted tasks need to be cancelled. + */ + @Test + public void testMultipleTaskCancellation() throws InterruptedException { + int taskCount = 5; + ConcurrentLinkedQueue sharedResultList = new ConcurrentLinkedQueue<>(); + QueryCountDownLatch sharedLatch = new QueryCountDownLatch(50, taskCount); + + // 模拟慢查询 + CountDownLatch allTasksStarted = new CountDownLatch(taskCount); + CountDownLatch proceedLatch = new CountDownLatch(1); + + List mockMessages = new ArrayList<>(); + BriefMQMessage message = new BriefMQMessage(); + message.setBody("test message"); + mockMessages.add(message); + + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenAnswer(invocation -> { + allTasksStarted.countDown(); + // 等待信号继续 + proceedLatch.await(10, TimeUnit.SECONDS); + return mockMessages; + }); + + ExecutorService executor = Executors.newFixedThreadPool(taskCount); + List> futures = new ArrayList<>(); + + // 提交多个任务 + for (int i = 0; i < taskCount; i++) { + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, sharedResultList, sharedLatch); + futures.add(executor.submit(task)); + } + + // 等待所有任务开始执行 + assertTrue(allTasksStarted.await(5, TimeUnit.SECONDS)); + + // 取消所有任务 + int cancelledCount = 0; + for (Future future : futures) { + if (future.cancel(true)) { + cancelledCount++; + } + } + + // 允许任务继续 + proceedLatch.countDown(); + + // 关闭线程池 + executor.shutdown(); + executor.awaitTermination(5, TimeUnit.SECONDS); + + // 验证:结果被丢弃(因为中断标志已设置) + assertTrue(sharedResultList.isEmpty()); + } + + /** + * Test: Verifies interrupt flag is checked at the right points. + * This test ensures the interrupt check happens both before and after the query. + */ + @Test + public void testInterruptCheckPoints() throws InterruptedException { + List mockMessages = new ArrayList<>(); + BriefMQMessage message = new BriefMQMessage(); + message.setBody("test message"); + mockMessages.add(message); + + // 记录查询被调用的次数 + final int[] queryCallCount = {0}; + + when(pulsarOperator.queryLatestMessage(any(), anyString(), any(), any(), anyBoolean())) + .thenAnswer(invocation -> { + queryCallCount[0]++; + return mockMessages; + }); + + // 场景1: 正常执行(不中断) + QueryLatestMessagesRunnable normalTask = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + normalTask.run(); + + assertEquals(1, queryCallCount[0], "Query should be called once in normal execution"); + assertEquals(1, messageResultQueue.size(), "Result should be added in normal execution"); + + // 重置 + queryCallCount[0] = 0; + messageResultQueue.clear(); + queryLatch = new QueryCountDownLatch(10, 1); + + // 场景2: 查询前中断 + Thread preInterruptThread = new Thread(() -> { + Thread.currentThread().interrupt(); + QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable( + pulsarOperator, streamInfo, clusterInfo, false, FULL_TOPIC_NAME, + queryMessageRequest, messageResultQueue, queryLatch); + task.run(); + }); + preInterruptThread.start(); + preInterruptThread.join(5000); + + assertEquals(0, queryCallCount[0], "Query should not be called when interrupted before"); + assertTrue(messageResultQueue.isEmpty(), "Result should not be added when interrupted before"); + } + + /** + * Test: Verifies that RejectedExecutionException message contains 'rejected' keyword. + * This ensures the exception can be properly identified when queue is full. + */ + @Test + public void testRejectedExecutionExceptionContainsRejectKeyword() { + // Create a thread pool with minimal capacity to trigger rejection + ExecutorService tinyExecutor = new java.util.concurrent.ThreadPoolExecutor( + 1, 1, 0L, TimeUnit.MILLISECONDS, + new java.util.concurrent.LinkedBlockingQueue<>(1), + new java.util.concurrent.ThreadPoolExecutor.AbortPolicy()); + + CountDownLatch blockingLatch = new CountDownLatch(1); + try { + // Submit blocking tasks to fill the pool and queue + tinyExecutor.submit(() -> { + try { + blockingLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + tinyExecutor.submit(() -> { + try { + blockingLatch.await(30, TimeUnit.SECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // This submission should be rejected (pool full, queue full) + tinyExecutor.submit(() -> { + // This task should never execute + }); + + // If we reach here, the test failed + org.junit.jupiter.api.Assertions.fail("Expected RejectedExecutionException was not thrown"); + } catch (java.util.concurrent.RejectedExecutionException e) { + // Verify the exception message contains 'reject' keyword (case-insensitive) + String message = e.getMessage(); + assertTrue(message != null && message.toLowerCase().contains("reject"), + "RejectedExecutionException message should contain 'reject' keyword, but was: " + message); + } finally { + blockingLatch.countDown(); + tinyExecutor.shutdownNow(); + } + } + + /** + * Test: Verifies that BusinessException thrown by PulsarQueueResourceOperator contains 'reject' keyword. + * This simulates the scenario where the queue is full and task submission is rejected. + */ + @Test + public void testBusinessExceptionContainsRejectKeywordWhenQueueFull() { + // Simulate the BusinessException that would be thrown when RejectedExecutionException occurs + String expectedMessage = "Query messages task rejected: too many concurrent requests"; + BusinessException exception = new BusinessException(expectedMessage); + + // Verify the exception message contains 'reject' keyword + assertTrue(exception.getMessage().contains("reject"), + "BusinessException message should contain 'reject' keyword"); + } +} diff --git a/inlong-manager/manager-web/src/main/resources/application.properties b/inlong-manager/manager-web/src/main/resources/application.properties index a498ac4d15c..07a1e53806d 100644 --- a/inlong-manager/manager-web/src/main/resources/application.properties +++ b/inlong-manager/manager-web/src/main/resources/application.properties @@ -64,5 +64,11 @@ openapi.auth.enabled=false audit.admin.ids=3,4,5,6 audit.user.ids=3,4,5,6 +# Pulsar message query thread pool configuration +pulsar.query.poolSize=10 +pulsar.query.keepAliveSeconds=60 +pulsar.query.queueCapacity=100 +pulsar.query.queryTimeoutSeconds=10 + # tencent cloud log service endpoint, The Operator cls resource by it cls.manager.endpoint=127.0.0.1 \ No newline at end of file