Skip to content
1 change: 1 addition & 0 deletions inlong-manager/manager-schedule/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
</parent>

<artifactId>manager-schedule</artifactId>
<name>Apache InLong - Manager Schedule</name>

<properties>
<quartz.version>2.3.2</quartz.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,10 @@ default void deleteQueueForStream(InlongGroupInfo groupInfo, InlongStreamInfo st
* @param groupInfo inlong group info
* @param streamInfo inlong stream info
* @param request query message request
* @throws Exception any exception if occurred
* @return query brief mq message info
*/
default List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
QueryMessageRequest request) throws Exception {
QueryMessageRequest request) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,17 +44,26 @@
import org.apache.inlong.manager.service.stream.InlongStreamService;

import com.google.common.base.Objects;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -80,7 +89,59 @@ public class PulsarQueueResourceOperator implements QueueResourceOperator {
@Autowired
private PulsarOperator pulsarOperator;

private ScheduledExecutorService executor = Executors.newScheduledThreadPool(10);
@Value("${pulsar.query.poolSize:10}")
private int poolSize;

@Value("${pulsar.query.keepAliveSeconds:60}")
private long keepAliveSeconds;

@Value("${pulsar.query.queueCapacity:100}")
private int queueCapacity;

@Value("${pulsar.query.queryTimeoutSeconds:10}")
private int queryTimeoutSeconds;

/**
* Thread pool for querying messages from multiple Pulsar clusters concurrently.
* Configuration is loaded from application properties with prefix 'pulsar.query'.
*/
private ExecutorService messageQueryExecutor;

/**
* Initialize the executor service after bean creation.
*/
@PostConstruct
public void init() {
// Initialize the executor service with same core pool size and max core pool size
this.messageQueryExecutor = new ThreadPoolExecutor(
poolSize,
poolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(queueCapacity),
new ThreadFactoryBuilder().setNameFormat("pulsar-message-query-%d").build(),
// Use AbortPolicy to throw exception when the queue is full
new ThreadPoolExecutor.AbortPolicy());
log.info("Init message query executor, poolSize={}, keepAliveSeconds={}, queueCapacity={}",
poolSize, keepAliveSeconds, queueCapacity);
}

/**
* Shutdown the executor service when the bean is destroyed.
*/
@PreDestroy
public void shutdown() {
log.info("Shutting down pulsar message query executor");
messageQueryExecutor.shutdown();
try {
if (!messageQueryExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
messageQueryExecutor.shutdownNow();
}
} catch (InterruptedException e) {
messageQueryExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}

@Override
public boolean accept(String mqType) {
Expand Down Expand Up @@ -307,34 +368,117 @@ private void deletePulsarTopic(InlongPulsarInfo pulsarInfo, PulsarClusterInfo pu
* Query latest message from pulsar
*/
public List<BriefMQMessage> queryLatestMessages(InlongGroupInfo groupInfo, InlongStreamInfo streamInfo,
QueryMessageRequest request) throws Exception {
List<ClusterInfo> pulsarClusterList = clusterService.listByTagAndType(groupInfo.getInlongClusterTag(),
ClusterType.PULSAR);
List<BriefMQMessage> briefMQMessages = Collections.synchronizedList(new ArrayList<>());
QueryCountDownLatch queryLatch = new QueryCountDownLatch(request.getMessageCount(), pulsarClusterList.size());
QueryMessageRequest request) {
String groupId = streamInfo.getInlongGroupId();
String clusterTag = groupInfo.getInlongClusterTag();
List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(clusterTag, ClusterType.PULSAR);
if (CollectionUtils.isEmpty(clusterInfos)) {
log.warn("No pulsar cluster found for clusterTag={} for groupId={}", clusterTag, groupId);
return Collections.emptyList();
}

// Select clusters and calculate per-cluster query count
Integer requestCount = request.getMessageCount();
int clusterSize = clusterInfos.size();
QueryCountDownLatch queryLatch = new QueryCountDownLatch(requestCount, clusterSize);
log.debug("Query pulsar message in {} clusters, each cluster query {} messages", clusterSize, requestCount);

// Extract common parameters
InlongPulsarInfo inlongPulsarInfo = ((InlongPulsarInfo) groupInfo);
for (ClusterInfo clusterInfo : pulsarClusterList) {
QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(inlongPulsarInfo, streamInfo,
(PulsarClusterInfo) clusterInfo, pulsarOperator, request, briefMQMessages, queryLatch);
this.executor.execute(task);
String tenant = inlongPulsarInfo.getPulsarTenant();
String namespace = inlongPulsarInfo.getMqResource();
String topicName = streamInfo.getMqResource();
boolean serialQueue = InlongConstants.PULSAR_QUEUE_TYPE_SERIAL.equals(inlongPulsarInfo.getQueueModule());

// Submit query tasks to thread pool, each task queries from one cluster
// Use submit() instead of execute() to get Future for cancellation support
List<Future<?>> submittedTasks = new ArrayList<>();
// Use ConcurrentLinkedQueue for thread-safe message collection,
// its performance is better than Collections.synchronizedList
ConcurrentLinkedQueue<BriefMQMessage> messageResultQueue = new ConcurrentLinkedQueue<>();
for (ClusterInfo clusterInfo : clusterInfos) {
PulsarClusterInfo pulsarCluster = (PulsarClusterInfo) clusterInfo;
if (StringUtils.isBlank(tenant)) {
tenant = pulsarCluster.getPulsarTenant();
}
String fullTopicName = tenant + "/" + namespace + "/" + topicName;
// Create a copy of request with adjusted message count for this cluster
QueryMessageRequest currentRequest = buildRequestForSingleCluster(request, requestCount);
QueryLatestMessagesRunnable task = new QueryLatestMessagesRunnable(pulsarOperator, streamInfo,
pulsarCluster, serialQueue, fullTopicName, currentRequest, messageResultQueue, queryLatch);
try {
Future<?> future = this.messageQueryExecutor.submit(task);
submittedTasks.add(future);
} catch (RejectedExecutionException e) {
// Cancel all previously submitted tasks before throwing exception
log.error("Failed to submit query task for groupId={}, cancelling {} submitted tasks",
groupId, submittedTasks.size(), e);
cancelSubmittedTasks(submittedTasks);
throw new BusinessException("Query messages task rejected: too many concurrent requests");
}
}

// Wait for tasks to complete with a configurable timeout
String streamId = streamInfo.getInlongStreamId();
try {
boolean completed = queryLatch.await(queryTimeoutSeconds, TimeUnit.SECONDS);
if (!completed) {
log.warn("Query messages timeout for groupId={}, streamId={}, collected {} messages",
groupId, streamId, messageResultQueue.size());
}
} catch (InterruptedException e) {
throw new BusinessException(String.format("Query messages task interrupted for groupId=%s, streamId=%s",
groupId, streamId));
}

log.info("Success query pulsar message for groupId={}, streamId={}", groupId, streamId);
List<BriefMQMessage> messageResultList = new ArrayList<>(messageResultQueue);

// if query result size is less than request count, return all, otherwise truncate to request count
if (messageResultList.isEmpty() || messageResultList.size() <= requestCount) {
return messageResultList;
}
queryLatch.await(30, TimeUnit.SECONDS);
log.info("success query pulsar message for groupId={}, streamId={}", streamInfo.getInlongGroupId(),
streamInfo.getInlongStreamId());

int finalMsgCount = Math.min(request.getMessageCount(), briefMQMessages.size());
if (finalMsgCount > 0) {
return new ArrayList<>(briefMQMessages.subList(0, finalMsgCount));
} else {
return new ArrayList<>();

return new ArrayList<>(messageResultList.subList(0, requestCount));
}

/**
* Build a new QueryMessageRequest with adjusted message count for a specific cluster.
*/
private QueryMessageRequest buildRequestForSingleCluster(QueryMessageRequest original, int messageCount) {
return QueryMessageRequest.builder()
.groupId(original.getGroupId())
.streamId(original.getStreamId())
.messageCount(messageCount)
.fieldName(original.getFieldName())
.operationType(original.getOperationType())
.targetValue(original.getTargetValue())
.build();
}

/**
* Cancel all submitted tasks when an error occurs.
* This method attempts to cancel tasks with interrupt flag set to true,
* allowing running tasks to be interrupted if they check for interruption.
*
* @param submittedTasks list of Future objects representing submitted tasks
*/
private void cancelSubmittedTasks(List<java.util.concurrent.Future<?>> submittedTasks) {
int cancelledCount = 0;
for (java.util.concurrent.Future<?> future : submittedTasks) {
// mayInterruptIfRunning=true allows interrupting running tasks
if (future.cancel(true)) {
cancelledCount++;
}
}
log.info("Cancelled {}/{} submitted tasks", cancelledCount, submittedTasks.size());
}

/**
* Reset cursor for consumer group
*/
public void resetCursor(InlongGroupInfo groupInfo, InlongStreamEntity streamEntity, StreamSinkEntity sinkEntity,
Long resetTime) throws Exception {
Long resetTime) {
log.info("begin to reset cursor for sinkId={}", sinkEntity.getId());
InlongPulsarInfo pulsarInfo = (InlongPulsarInfo) groupInfo;
List<ClusterInfo> clusterInfos = clusterService.listByTagAndType(pulsarInfo.getInlongClusterTag(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,55 @@
import java.util.concurrent.TimeUnit;

/**
* QueryCountDownLatch
* QueryCountDownLatch for managing query task and data completion.
* <p>
* This class provides two types of countdown:
* <ul>
* <li>Task countdown: tracks the number of tasks completed (regardless of success or failure)</li>
* <li>Data countdown: tracks the number of data items retrieved</li>
* </ul>
* The flagLatch is released when either all tasks complete or enough data is collected.
*/
public class QueryCountDownLatch {

private CountDownLatch dataLatch;
private CountDownLatch taskLatch;
private CountDownLatch flagLatch;
private final CountDownLatch dataLatch;
private final CountDownLatch taskLatch;
private final CountDownLatch flagLatch;

public QueryCountDownLatch(int dataSize, int taskSize) {
this.dataLatch = new CountDownLatch(dataSize);
this.taskLatch = new CountDownLatch(taskSize);
this.flagLatch = new CountDownLatch(1);
}

public void countDown(int dataDownSize) {
/**
* Called when a task completes (regardless of success or failure).
* This should be called in a finally block to ensure it's always executed.
*/
public void taskCountDown() {
this.taskLatch.countDown();
checkAndRelease();
}

/**
* Called when data items are successfully retrieved.
*
* @param dataDownSize the number of data items retrieved
*/
public void dataCountDown(int dataDownSize) {
for (int i = 0; i < dataDownSize; i++) {
this.dataLatch.countDown();
}
if (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0) {
checkAndRelease();
}

/**
* Check if the flagLatch should be released.
* Release when all tasks complete or enough data is collected.
*/
private synchronized void checkAndRelease() {
if (this.flagLatch.getCount() > 0
&& (this.taskLatch.getCount() == 0 || this.dataLatch.getCount() == 0)) {
this.flagLatch.countDown();
}
}
Expand Down
Loading
Loading