diff --git a/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java b/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
index 586ab11a7..884b78a2e 100644
--- a/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
+++ b/extras/queue-manager-replicated/core/src/main/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManager.java
@@ -13,6 +13,7 @@
import io.a2a.server.events.EventQueueFactory;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.InMemoryQueueManager;
+import io.a2a.server.events.MainEventBus;
import io.a2a.server.events.QueueManager;
import io.a2a.server.tasks.TaskStateProvider;
import org.slf4j.Logger;
@@ -45,10 +46,12 @@ protected ReplicatedQueueManager() {
}
@Inject
- public ReplicatedQueueManager(ReplicationStrategy replicationStrategy, TaskStateProvider taskStateProvider) {
+ public ReplicatedQueueManager(ReplicationStrategy replicationStrategy,
+ TaskStateProvider taskStateProvider,
+ MainEventBus mainEventBus) {
this.replicationStrategy = replicationStrategy;
this.taskStateProvider = taskStateProvider;
- this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider);
+ this.delegate = new InMemoryQueueManager(new ReplicatingEventQueueFactory(), taskStateProvider, mainEventBus);
}
@@ -152,12 +155,11 @@ public EventQueue.EventQueueBuilder builder(String taskId) {
// which sends the QueueClosedEvent after the database transaction commits.
// This ensures proper ordering and transactional guarantees.
- // Return the builder with callbacks
- return delegate.getEventQueueBuilder(taskId)
- .taskId(taskId)
- .hook(new ReplicationHook(taskId))
- .addOnCloseCallback(delegate.getCleanupCallback(taskId))
- .taskStateProvider(taskStateProvider);
+ // Call createBaseEventQueueBuilder() directly to avoid infinite recursion
+ // (getEventQueueBuilder() would delegate back to this factory, creating a loop)
+ // The base builder already includes: taskId, cleanup callback, taskStateProvider, mainEventBus
+ return delegate.createBaseEventQueueBuilder(taskId)
+ .hook(new ReplicationHook(taskId));
}
}
diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
index 43571cd30..a339be543 100644
--- a/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
+++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/extras/queuemanager/replicated/core/ReplicatedQueueManagerTest.java
@@ -22,12 +22,18 @@
import io.a2a.server.events.EventQueueClosedException;
import io.a2a.server.events.EventQueueItem;
import io.a2a.server.events.EventQueueTestHelper;
+import io.a2a.server.events.EventQueueUtil;
+import io.a2a.server.events.MainEventBus;
+import io.a2a.server.events.MainEventBusProcessor;
import io.a2a.server.events.QueueClosedEvent;
+import io.a2a.server.tasks.InMemoryTaskStore;
+import io.a2a.server.tasks.PushNotificationSender;
import io.a2a.spec.Event;
import io.a2a.spec.StreamingEventKind;
import io.a2a.spec.TaskState;
import io.a2a.spec.TaskStatus;
import io.a2a.spec.TaskStatusUpdateEvent;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -35,10 +41,24 @@ class ReplicatedQueueManagerTest {
private ReplicatedQueueManager queueManager;
private StreamingEventKind testEvent;
+ private MainEventBus mainEventBus;
+ private MainEventBusProcessor mainEventBusProcessor;
+ private static final PushNotificationSender NOOP_PUSHNOTIFICATION_SENDER = task -> {};
@BeforeEach
void setUp() {
- queueManager = new ReplicatedQueueManager(new NoOpReplicationStrategy(), new MockTaskStateProvider(true));
+ // Create MainEventBus and MainEventBusProcessor for tests
+ InMemoryTaskStore taskStore = new InMemoryTaskStore();
+ mainEventBus = new MainEventBus();
+ mainEventBusProcessor = new MainEventBusProcessor(mainEventBus, taskStore, NOOP_PUSHNOTIFICATION_SENDER);
+ EventQueueUtil.start(mainEventBusProcessor);
+
+ queueManager = new ReplicatedQueueManager(
+ new NoOpReplicationStrategy(),
+ new MockTaskStateProvider(true),
+ mainEventBus
+ );
+
testEvent = TaskStatusUpdateEvent.builder()
.taskId("test-task")
.contextId("test-context")
@@ -47,10 +67,65 @@ void setUp() {
.build();
}
+ /**
+ * Helper to create a test event with the specified taskId.
+ * This ensures taskId consistency between queue creation and event creation.
+ */
+ private TaskStatusUpdateEvent createEventForTask(String taskId) {
+ return TaskStatusUpdateEvent.builder()
+ .taskId(taskId)
+ .contextId("test-context")
+ .status(new TaskStatus(TaskState.SUBMITTED))
+ .isFinal(false)
+ .build();
+ }
+
+ @AfterEach
+ void tearDown() {
+ if (mainEventBusProcessor != null) {
+ mainEventBusProcessor.setCallback(null); // Clear any test callbacks
+ EventQueueUtil.stop(mainEventBusProcessor);
+ }
+ mainEventBusProcessor = null;
+ mainEventBus = null;
+ queueManager = null;
+ }
+
+ /**
+ * Helper to wait for MainEventBusProcessor to process an event.
+ * Replaces polling patterns with deterministic callback-based waiting.
+ *
+ * @param action the action that triggers event processing
+ * @throws InterruptedException if waiting is interrupted
+ * @throws AssertionError if processing doesn't complete within timeout
+ */
+ private void waitForEventProcessing(Runnable action) throws InterruptedException {
+ CountDownLatch processingLatch = new CountDownLatch(1);
+ mainEventBusProcessor.setCallback(new io.a2a.server.events.MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, io.a2a.spec.Event event) {
+ processingLatch.countDown();
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // Not needed for basic event processing wait
+ }
+ });
+
+ try {
+ action.run();
+ assertTrue(processingLatch.await(5, TimeUnit.SECONDS),
+ "MainEventBusProcessor should have processed the event within timeout");
+ } finally {
+ mainEventBusProcessor.setCallback(null);
+ }
+ }
+
@Test
void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "test-task-1";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -65,7 +140,7 @@ void testReplicationStrategyTriggeredOnNormalEnqueue() throws InterruptedExcepti
@Test
void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "test-task-2";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -79,7 +154,7 @@ void testReplicationStrategyNotTriggeredOnReplicatedEvent() throws InterruptedEx
@Test
void testReplicationStrategyWithCountingImplementation() throws InterruptedException {
CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "test-task-3";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -100,46 +175,45 @@ void testReplicationStrategyWithCountingImplementation() throws InterruptedExcep
@Test
void testReplicatedEventDeliveredToCorrectQueue() throws InterruptedException {
String taskId = "test-task-4";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
EventQueue queue = queueManager.createOrTap(taskId);
- ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
- queueManager.onReplicatedEvent(replicatedEvent);
+ ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
- Event dequeuedEvent;
- try {
- dequeuedEvent = queue.dequeueEventItem(100).getEvent();
- } catch (EventQueueClosedException e) {
- fail("Queue should not be closed");
- return;
- }
- assertEquals(testEvent, dequeuedEvent);
+ // Use callback to wait for event processing
+ EventQueueItem item = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedEvent));
+ assertNotNull(item, "Event should be available in queue");
+ Event dequeuedEvent = item.getEvent();
+ assertEquals(eventForTask, dequeuedEvent);
}
@Test
void testReplicatedEventCreatesQueueIfNeeded() throws InterruptedException {
String taskId = "non-existent-task";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
// Verify no queue exists initially
assertNull(queueManager.get(taskId));
- ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
-
- // Process the replicated event
- assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
-
- // Verify that a queue was created and the event was enqueued
- EventQueue queue = queueManager.get(taskId);
- assertNotNull(queue, "Queue should be created when processing replicated event for non-existent task");
-
- // Verify the event was enqueued by dequeuing it
- Event dequeuedEvent;
- try {
- dequeuedEvent = queue.dequeueEventItem(100).getEvent();
- } catch (EventQueueClosedException e) {
- fail("Queue should not be closed");
- return;
- }
- assertEquals(testEvent, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
+ // Create a ChildQueue BEFORE processing the replicated event
+ // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
+ EventQueue childQueue = queueManager.createOrTap(taskId);
+ assertNotNull(childQueue, "ChildQueue should be created");
+
+ // Verify MainQueue was created
+ EventQueue mainQueue = queueManager.get(taskId);
+ assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
+
+ ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
+
+ // Process the replicated event and wait for distribution
+ // Use callback to wait for event processing
+ EventQueueItem item = dequeueEventWithRetry(childQueue, () -> {
+ assertDoesNotThrow(() -> queueManager.onReplicatedEvent(replicatedEvent));
+ });
+ assertNotNull(item, "Event should be available in queue");
+ Event dequeuedEvent = item.getEvent();
+ assertEquals(eventForTask, dequeuedEvent, "The replicated event should be enqueued in the newly created queue");
}
@Test
@@ -170,7 +244,7 @@ void testBasicQueueManagerFunctionality() throws InterruptedException {
void testQueueToTaskIdMappingMaintained() throws InterruptedException {
String taskId = "test-task-6";
CountingReplicationStrategy countingStrategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(countingStrategy, new MockTaskStateProvider(true), mainEventBus);
EventQueue queue = queueManager.createOrTap(taskId);
queue.enqueueEvent(testEvent);
@@ -217,7 +291,7 @@ void testReplicatedEventJsonSerialization() throws Exception {
@Test
void testParallelReplicationBehavior() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "parallel-test-task";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -297,7 +371,7 @@ void testParallelReplicationBehavior() throws InterruptedException {
void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException {
// Create a task state provider that returns false (task is inactive)
MockTaskStateProvider stateProvider = new MockTaskStateProvider(false);
- queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
+ queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
String taskId = "inactive-task";
@@ -316,30 +390,32 @@ void testReplicatedEventSkippedWhenTaskInactive() throws InterruptedException {
void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
// Create a task state provider that returns true (task is active)
MockTaskStateProvider stateProvider = new MockTaskStateProvider(true);
- queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
+ queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
String taskId = "active-task";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
// Verify no queue exists initially
assertNull(queueManager.get(taskId));
- // Process a replicated event for an active task
- ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, testEvent);
- queueManager.onReplicatedEvent(replicatedEvent);
+ // Create a ChildQueue BEFORE processing the replicated event
+ // This ensures the ChildQueue exists when MainEventBusProcessor distributes the event
+ EventQueue childQueue = queueManager.createOrTap(taskId);
+ assertNotNull(childQueue, "ChildQueue should be created");
- // Queue should be created and event should be enqueued
- EventQueue queue = queueManager.get(taskId);
- assertNotNull(queue, "Queue should be created for active task");
+ // Verify MainQueue was created
+ EventQueue mainQueue = queueManager.get(taskId);
+ assertNotNull(mainQueue, "MainQueue should exist after createOrTap");
- // Verify the event was enqueued
- Event dequeuedEvent;
- try {
- dequeuedEvent = queue.dequeueEventItem(100).getEvent();
- } catch (EventQueueClosedException e) {
- fail("Queue should not be closed");
- return;
- }
- assertEquals(testEvent, dequeuedEvent, "Event should be enqueued for active task");
+ // Process a replicated event for an active task
+ ReplicatedEventQueueItem replicatedEvent = new ReplicatedEventQueueItem(taskId, eventForTask);
+
+ // Verify the event was enqueued and distributed to our ChildQueue
+ // Use callback to wait for event processing
+ EventQueueItem item = dequeueEventWithRetry(childQueue, () -> queueManager.onReplicatedEvent(replicatedEvent));
+ assertNotNull(item, "Event should be available in queue");
+ Event dequeuedEvent = item.getEvent();
+ assertEquals(eventForTask, dequeuedEvent, "Event should be enqueued for active task");
}
@@ -347,7 +423,7 @@ void testReplicatedEventProcessedWhenTaskActive() throws InterruptedException {
void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws InterruptedException {
// Create a task state provider that returns true initially
MockTaskStateProvider stateProvider = new MockTaskStateProvider(true);
- queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider);
+ queueManager = new ReplicatedQueueManager(new CountingReplicationStrategy(), stateProvider, mainEventBus);
String taskId = "task-becomes-inactive";
@@ -387,7 +463,7 @@ void testReplicatedEventToExistingQueueWhenTaskBecomesInactive() throws Interrup
@Test
void testPoisonPillSentViaTransactionAwareEvent() throws InterruptedException {
CountingReplicationStrategy strategy = new CountingReplicationStrategy();
- queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true));
+ queueManager = new ReplicatedQueueManager(strategy, new MockTaskStateProvider(true), mainEventBus);
String taskId = "poison-pill-test";
EventQueue queue = queueManager.createOrTap(taskId);
@@ -451,36 +527,21 @@ void testQueueClosedEventJsonSerialization() throws Exception {
@Test
void testReplicatedQueueClosedEventTerminatesConsumer() throws InterruptedException {
String taskId = "remote-close-test";
+ TaskStatusUpdateEvent eventForTask = createEventForTask(taskId); // Use matching taskId
EventQueue queue = queueManager.createOrTap(taskId);
- // Enqueue a normal event
- queue.enqueueEvent(testEvent);
-
// Simulate receiving QueueClosedEvent from remote node
QueueClosedEvent closedEvent = new QueueClosedEvent(taskId);
ReplicatedEventQueueItem replicatedClosedEvent = new ReplicatedEventQueueItem(taskId, closedEvent);
- queueManager.onReplicatedEvent(replicatedClosedEvent);
- // Dequeue the normal event first
- EventQueueItem item1;
- try {
- item1 = queue.dequeueEventItem(100);
- } catch (EventQueueClosedException e) {
- fail("Should not throw on first dequeue");
- return;
- }
- assertNotNull(item1);
- assertEquals(testEvent, item1.getEvent());
+ // Dequeue the normal event first (use callback to wait for async processing)
+ EventQueueItem item1 = dequeueEventWithRetry(queue, () -> queue.enqueueEvent(eventForTask));
+ assertNotNull(item1, "First event should be available");
+ assertEquals(eventForTask, item1.getEvent());
- // Next dequeue should get the QueueClosedEvent
- EventQueueItem item2;
- try {
- item2 = queue.dequeueEventItem(100);
- } catch (EventQueueClosedException e) {
- fail("Should not throw on second dequeue, should return the event");
- return;
- }
- assertNotNull(item2);
+ // Next dequeue should get the QueueClosedEvent (use callback to wait for async processing)
+ EventQueueItem item2 = dequeueEventWithRetry(queue, () -> queueManager.onReplicatedEvent(replicatedClosedEvent));
+ assertNotNull(item2, "QueueClosedEvent should be available");
assertTrue(item2.getEvent() instanceof QueueClosedEvent,
"Second event should be QueueClosedEvent");
}
@@ -539,4 +600,25 @@ public void setActive(boolean active) {
this.active = active;
}
}
+
+ /**
+ * Helper method to dequeue an event after waiting for MainEventBusProcessor distribution.
+ * Uses callback-based waiting instead of polling for deterministic synchronization.
+ *
+ * @param queue the queue to dequeue from
+ * @param enqueueAction the action that enqueues the event (triggers event processing)
+ * @return the dequeued EventQueueItem, or null if queue is closed
+ */
+ private EventQueueItem dequeueEventWithRetry(EventQueue queue, Runnable enqueueAction) throws InterruptedException {
+ // Wait for event to be processed and distributed
+ waitForEventProcessing(enqueueAction);
+
+ // Event is now available, dequeue directly
+ try {
+ return queue.dequeueEventItem(100);
+ } catch (EventQueueClosedException e) {
+ // Queue closed, return null
+ return null;
+ }
+ }
}
\ No newline at end of file
diff --git a/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java b/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
new file mode 100644
index 000000000..a91575aaa
--- /dev/null
+++ b/extras/queue-manager-replicated/core/src/test/java/io/a2a/server/events/EventQueueUtil.java
@@ -0,0 +1,11 @@
+package io.a2a.server.events;
+
+public class EventQueueUtil {
+ public static void start(MainEventBusProcessor processor) {
+ processor.start();
+ }
+
+ public static void stop(MainEventBusProcessor processor) {
+ processor.stop();
+ }
+}
diff --git a/fix_tests.py b/fix_tests.py
new file mode 100644
index 000000000..c87d57d59
--- /dev/null
+++ b/fix_tests.py
@@ -0,0 +1,28 @@
+#!/usr/bin/env python3
+import re
+import sys
+import glob
+
+test_files = glob.glob('**/src/test/java/**/*Test.java', recursive=True)
+
+for path in test_files:
+ try:
+ with open(path, 'r') as f:
+ content = f.read()
+
+ # Pattern to match create() calls with 5 arguments ending with executor or internalExecutor
+ pattern = r'DefaultRequestHandler\.create\(\s*([^,]+),\s*([^,]+),\s*([^,]+),\s*([^,]+),\s*((?:executor|internalExecutor))\s*\)'
+
+ def replacer(match):
+ args = [match.group(i).strip() for i in range(1, 6)]
+ executor = args[4]
+ return f'DefaultRequestHandler.create({args[0]}, {args[1]}, {args[2]}, {args[3]}, {executor}, {executor})'
+
+ new_content = re.sub(pattern, replacer, content, flags=re.MULTILINE | re.DOTALL)
+
+ if new_content != content:
+ with open(path, 'w') as f:
+ f.write(new_content)
+ print(f'Fixed: {path}')
+ except Exception as e:
+ print(f'Error processing {path}: {e}')
diff --git a/main-rest-tck.log b/main-rest-tck.log
new file mode 100644
index 000000000..43cf5158e
--- /dev/null
+++ b/main-rest-tck.log
@@ -0,0 +1,101 @@
+2026-01-23 13:20:23,852 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] WARN [io.qua.config] (Quarkus Main Thread) The "quarkus.log.file.enable" config property is deprecated and should not be used anymore.
+2026-01-23 13:20:24,000 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.con.DefaultValuesConfigProvider] (Quarkus Main Thread) Loaded 5 A2A default configuration values from 1 resource(s)
+2026-01-23 13:20:24,002 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.uti.asy.AsyncExecutorProducer] (Quarkus Main Thread) Initializing async executor: corePoolSize=5, maxPoolSize=50, keepAliveSeconds=60
+2026-01-23 13:20:24,124 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.qua.grp.run.GrpcServerRecorder] (Quarkus Main Thread) Starting new Quarkus gRPC server (using Vert.x transport)...
+2026-01-23 13:20:24,258 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.quarkus] (Quarkus Main Thread) a2a-tck-server 1.0.0.Alpha2-SNAPSHOT on JVM (powered by Quarkus 3.30.6) started in 3.008s. Listening on: http://localhost:9999
+2026-01-23 13:20:24,260 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.quarkus] (Quarkus Main Thread) Profile dev activated. Live Coding activated.
+2026-01-23 13:20:24,260 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.quarkus] (Quarkus Main Thread) Installed features: [cdi, grpc-client, grpc-server, reactive-routes, rest, rest-jackson, smallrye-context-propagation, vertx]
+2026-01-23 13:20:25,496 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) onMessageSendStream START - task: null; context: null; runningAgents: 0; backgroundTasks: 0
+2026-01-23 13:20:25,497 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.InMemoryQueueManager] (vert.x-worker-thread-1) createOrTap called for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b, current map size: 0
+2026-01-23 13:20:25,499 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (vert.x-worker-thread-1) Created MainQueue for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b with 1 onClose callbacks and TaskStateProvider: true
+2026-01-23 13:20:25,500 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.InMemoryQueueManager] (vert.x-worker-thread-1) Created new MainQueue 1795784187 for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b, returning ChildQueue 1575838821 (map size: 1)
+2026-01-23 13:20:25,500 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Created/tapped queue for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b: io.a2a.server.events.EventQueue$ChildQueue@5ded6465
+2026-01-23 13:20:25,500 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Registering agent execution for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b, runningAgents.size() before: 0
+2026-01-23 13:20:25,500 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) === THREAD STATS: AGENT START ===
+2026-01-23 13:20:25,500 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Active threads: 28
+2026-01-23 13:20:25,501 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Running agents: 0
+2026-01-23 13:20:25,502 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Background tasks: 0
+2026-01-23 13:20:25,502 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Queue manager active queues: InMemoryQueueManager_ClientProxy
+2026-01-23 13:20:25,502 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) === END THREAD STATS ===
+2026-01-23 13:20:25,503 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Agent execution starting for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,503 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Registered agent for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b, runningAgents.size() after: 1
+2026-01-23 13:20:25,504 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventConsumer] (vert.x-worker-thread-1) EventConsumer created with queue 1575838821
+2026-01-23 13:20:25,507 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) onMessageSendStream FINALLY - task: 8e618ff9-11d7-4434-99ae-cfb7eb19264b; runningAgents: 1; backgroundTasks: 0
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Removed agent for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b from runningAgents in finally block, size after: 0
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Starting cleanup for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b (streaming=true)
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) === THREAD STATS: CLEANUP START ===
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Active threads: 29
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Running agents: 0
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Background tasks: 0
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Queue manager active queues: InMemoryQueueManager_ClientProxy
+2026-01-23 13:20:25,508 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) === END THREAD STATS ===
+2026-01-23 13:20:25,509 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (vert.x-worker-thread-1) Tracking background task (total: 1): java.util.concurrent.CompletableFuture@51d79fdc[Not completed]
+2026-01-23 13:20:25,512 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-2) MAIN: subscribeObject() called
+2026-01-23 13:20:25,512 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-1) Enqueued event Task[id=8e618ff9-11d7-4434-99ae-cfb7eb19264b, contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, status=TaskStatus[state=SUBMITTED, message=null, timestamp=2026-01-23T13:20:25.504012Z], artifacts=[], history=[Message[role=USER, parts=[TextPart[text=Stream test message]], messageId=stream-message-id-d68a9e3c-e8d6-4ee1-9231-c935c3e58e0c, contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, taskId=8e618ff9-11d7-4434-99ae-cfb7eb19264b, referenceTaskIds=null, metadata=null, extensions=null]], metadata=null] io.a2a.server.events.EventQueue$MainQueue@6b097dfb
+2026-01-23 13:20:25,513 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-1) Enqueued event Task[id=8e618ff9-11d7-4434-99ae-cfb7eb19264b, contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, status=TaskStatus[state=SUBMITTED, message=null, timestamp=2026-01-23T13:20:25.504012Z], artifacts=[], history=[Message[role=USER, parts=[TextPart[text=Stream test message]], messageId=stream-message-id-d68a9e3c-e8d6-4ee1-9231-c935c3e58e0c, contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, taskId=8e618ff9-11d7-4434-99ae-cfb7eb19264b, referenceTaskIds=null, metadata=null, extensions=null]], metadata=null] io.a2a.server.events.EventQueue$ChildQueue@5ded6465
+2026-01-23 13:20:25,513 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-2) MAIN: write() called
+2026-01-23 13:20:25,515 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-2) MAIN: onSubscribe() called
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-1) Enqueued event TaskStatusUpdateEvent[taskId=8e618ff9-11d7-4434-99ae-cfb7eb19264b, status=TaskStatus[state=WORKING, message=null, timestamp=2026-01-23T13:20:25.514666Z], contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, isFinal=false, metadata=null] io.a2a.server.events.EventQueue$MainQueue@6b097dfb
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-1) Enqueued event TaskStatusUpdateEvent[taskId=8e618ff9-11d7-4434-99ae-cfb7eb19264b, status=TaskStatus[state=WORKING, message=null, timestamp=2026-01-23T13:20:25.514666Z], contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, isFinal=false, metadata=null] io.a2a.server.events.EventQueue$ChildQueue@5ded6465
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-2) MAIN: onSubscribe() - requested first buffer
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Agent execution completed for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) === THREAD STATS: AGENT COMPLETE END ===
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Active threads: 31
+2026-01-23 13:20:25,517 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Running agents: 0
+2026-01-23 13:20:25,518 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) Creating subscription wrapper for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,518 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Background tasks: 1
+2026-01-23 13:20:25,518 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Queue manager active queues: InMemoryQueueManager_ClientProxy
+2026-01-23 13:20:25,518 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Background tasks:
+2026-01-23 13:20:25,518 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) - java.util.concurrent.CompletableFuture@51d79fdc[Not completed, 1 dependents]: RUNNING
+2026-01-23 13:20:25,518 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) === END THREAD STATS ===
+2026-01-23 13:20:25,519 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Agent and consumption both completed successfully for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,519 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Streaming call, closing ChildQueue for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b (immediate=false, notifyParent=true)
+2026-01-23 13:20:25,519 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-1) Closing io.a2a.server.events.EventQueue$ChildQueue@5ded6465 (immediate=false)
+2026-01-23 13:20:25,519 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) onSubscribe called for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-1) MainQueue for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b has no children, but task is not finalized - keeping queue open for potential resubscriptions
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Queue cleanup completed for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) === THREAD STATS: CLEANUP END ===
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Active threads: 31
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: onSubscribe called
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Running agents: 0
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) Subscription.request(1) for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Background tasks: 1
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Queue manager active queues: InMemoryQueueManager_ClientProxy
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Background tasks:
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: requested first event
+2026-01-23 13:20:25,520 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) - java.util.concurrent.CompletableFuture@51d79fdc[Not completed, 1 dependents]: RUNNING
+2026-01-23 13:20:25,521 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) === END THREAD STATS ===
+2026-01-23 13:20:25,521 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-1) Removed background task (remaining: 0): java.util.concurrent.CompletableFuture@51d79fdc[Completed normally]
+2026-01-23 13:20:25,522 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-3) Dequeued event item (waiting) io.a2a.server.events.EventQueue$ChildQueue@5ded6465 Task[id=8e618ff9-11d7-4434-99ae-cfb7eb19264b, contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, status=TaskStatus[state=SUBMITTED, message=null, timestamp=2026-01-23T13:20:25.504012Z], artifacts=[], history=[Message[role=USER, parts=[TextPart[text=Stream test message]], messageId=stream-message-id-d68a9e3c-e8d6-4ee1-9231-c935c3e58e0c, contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, taskId=8e618ff9-11d7-4434-99ae-cfb7eb19264b, referenceTaskIds=null, metadata=null, extensions=null]], metadata=null]
+2026-01-23 13:20:25,523 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-3) Signalling that queue polling started io.a2a.server.events.EventQueue$MainQueue@6b097dfb
+2026-01-23 13:20:25,633 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) onNext: Task for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,634 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: onNext called with: Task
+2026-01-23 13:20:25,655 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: Sending to tube: {"task":{"id":"8e618ff9-11d7-4434-99ae-cfb7eb19264b","contextId":"b9b1efc3-51ae-43b3-9634-133c396ed3
+2026-01-23 13:20:25,656 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: map.apply() called with: String
+2026-01-23 13:20:25,657 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: Created SSE buffer (473bytes): data: {"task":{"id":"8e618ff9-11d7-4434-99ae-cfb7eb19264b","contextId":"b9b1efc3-51ae-43b3-9634-133c
+2026-01-23 13:20:25,658 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: onNext() called - buffer size: 473 bytes
+2026-01-23 13:20:25,659 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: initialize() called - bytesWritten: 0
+2026-01-23 13:20:25,659 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: Setting headers and chunked mode
+2026-01-23 13:20:25,659 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: Headers set, chunked: true
+2026-01-23 13:20:25,659 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: onNext() - calling response.write()
+2026-01-23 13:20:25,661 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) Subscription.request(1) for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,661 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: Requested next event after tube.send
+2026-01-23 13:20:25,662 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-3) Dequeued event item (waiting) io.a2a.server.events.EventQueue$ChildQueue@5ded6465 TaskStatusUpdateEvent[taskId=8e618ff9-11d7-4434-99ae-cfb7eb19264b, status=TaskStatus[state=WORKING, message=null, timestamp=2026-01-23T13:20:25.514666Z], contextId=b9b1efc3-51ae-43b3-9634-133c396ed31b, isFinal=false, metadata=null]
+2026-01-23 13:20:25,662 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) onNext: TaskStatusUpdateEvent for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,662 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: onNext called with: TaskStatusUpdateEvent
+2026-01-23 13:20:25,662 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (vert.x-eventloop-thread-0) MAIN: write handler called - success: true
+2026-01-23 13:20:25,662 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (vert.x-eventloop-thread-0) MAIN: onWriteDone() - write succeeded, requesting next
+2026-01-23 13:20:25,664 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: Sending to tube: {"statusUpdate":{"taskId":"8e618ff9-11d7-4434-99ae-cfb7eb19264b","contextId":"b9b1efc3-51ae-43b3-963
+2026-01-23 13:20:25,664 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: map.apply() called with: String
+2026-01-23 13:20:25,665 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: Created SSE buffer (227bytes): data: {"statusUpdate":{"taskId":"8e618ff9-11d7-4434-99ae-cfb7eb19264b","contextId":"b9b1efc3-51ae-43
+2026-01-23 13:20:25,665 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: onNext() called - buffer size: 227 bytes
+2026-01-23 13:20:25,665 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: initialize() called - bytesWritten: 473
+2026-01-23 13:20:25,665 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] WARN [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: initialize() SKIPPED - bytesWritten: 473
+2026-01-23 13:20:25,666 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (a2a-agent-executor-3) MAIN: onNext() - calling response.write()
+2026-01-23 13:20:25,666 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) Subscription.request(1) for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,666 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.tra.res.han.RestHandler] (a2a-agent-executor-3) MAIN RestHandler: Requested next event after tube.send
+2026-01-23 13:20:25,667 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventQueue] (a2a-agent-executor-3) Queue is closed, and empty. Sending termination message. io.a2a.server.events.EventQueue$ChildQueue@5ded6465
+2026-01-23 13:20:25,667 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (vert.x-eventloop-thread-0) MAIN: write handler called - success: true
+2026-01-23 13:20:25,667 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.req.DefaultRequestHandler] (a2a-agent-executor-3) onComplete for task 8e618ff9-11d7-4434-99ae-cfb7eb19264b
+2026-01-23 13:20:25,667 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] INFO [io.a2a.ser.res.qua.A2AServerRoutes$MultiSseSupport] (vert.x-eventloop-thread-0) MAIN: onWriteDone() - write succeeded, requesting next
+2026-01-23 13:20:25,669 kabirs-macbook-pro-2 /Users/kabir/.sdkman/candidates/java/current/bin/java[77369] DEBUG [io.a2a.ser.eve.EventConsumer] (a2a-agent-executor-3) EventConsumer finally block: completed=true, skipping tube.complete() for queue 1575838821
diff --git a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
index 18e18a2f1..cb5bdb25b 100644
--- a/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
+++ b/reference/jsonrpc/src/main/java/io/a2a/server/apps/quarkus/A2AServerRoutes.java
@@ -13,7 +13,6 @@
import java.util.concurrent.Executor;
import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.function.Function;
import jakarta.enterprise.inject.Instance;
import jakarta.inject.Inject;
@@ -21,6 +20,7 @@
import com.google.gson.JsonSyntaxException;
import io.a2a.common.A2AHeaders;
+import io.a2a.server.util.sse.SseFormatter;
import io.a2a.grpc.utils.JSONRPCUtils;
import io.a2a.jsonrpc.common.json.IdJsonMappingException;
import io.a2a.jsonrpc.common.json.InvalidParamsJsonMappingException;
@@ -65,7 +65,6 @@
import io.a2a.transport.jsonrpc.handler.JSONRPCHandler;
import io.quarkus.security.Authenticated;
import io.quarkus.vertx.web.Body;
-import io.quarkus.vertx.web.ReactiveRoutes;
import io.quarkus.vertx.web.Route;
import io.smallrye.mutiny.Multi;
import io.vertx.core.AsyncResult;
@@ -74,6 +73,8 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.ext.web.RoutingContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@Singleton
public class A2AServerRoutes {
@@ -135,8 +136,12 @@ public void invokeJSONRPCHandler(@Body String body, RoutingContext rc) {
} else if (streaming) {
final Multi extends A2AResponse>> finalStreamingResponse = streamingResponse;
executor.execute(() -> {
- MultiSseSupport.subscribeObject(
- finalStreamingResponse.map(i -> (Object) i), rc);
+ // Convert Multi
+ * This class only handles HTTP-specific concerns (writing to response, backpressure, disconnect).
+ * SSE formatting and JSON serialization are handled by {@link SseFormatter}.
+ */
private static class MultiSseSupport {
+ private static final Logger logger = LoggerFactory.getLogger(MultiSseSupport.class);
private MultiSseSupport() {
// Avoid direct instantiation.
}
- private static void initialize(HttpServerResponse response) {
- if (response.bytesWritten() == 0) {
- MultiMap headers = response.headers();
- if (headers.get(CONTENT_TYPE) == null) {
- headers.set(CONTENT_TYPE, SERVER_SENT_EVENTS);
- }
- response.setChunked(true);
- }
- }
-
- private static void onWriteDone(Flow.Subscription subscription, AsyncResult
- * Use {@link #builder()} to create configured instances or extend MainQueue/ChildQueue directly.
+ * Use {@link #builder(MainEventBus)} to create configured instances or extend MainQueue/ChildQueue directly.
*
+ * Note: MainQueue does not support dequeue operations - only ChildQueues can be consumed. + *
* * @param waitMilliSeconds the maximum time to wait in milliseconds * @return the EventQueueItem, or null if timeout occurs * @throws EventQueueClosedException if the queue is closed and empty + * @throws UnsupportedOperationException if called on MainQueue */ - public @Nullable EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException { - if (closed && queue.isEmpty()) { - LOGGER.debug("Queue is closed, and empty. Sending termination message. {}", this); - throw new EventQueueClosedException(); - } - try { - if (waitMilliSeconds <= 0) { - EventQueueItem item = queue.poll(); - if (item != null) { - Event event = item.getEvent(); - // Call toString() since for errors we don't really want the full stacktrace - LOGGER.debug("Dequeued event item (no wait) {} {}", this, event instanceof Throwable ? event.toString() : event); - semaphore.release(); - } - return item; - } - try { - LOGGER.trace("Polling queue {} (wait={}ms)", System.identityHashCode(this), waitMilliSeconds); - EventQueueItem item = queue.poll(waitMilliSeconds, TimeUnit.MILLISECONDS); - if (item != null) { - Event event = item.getEvent(); - // Call toString() since for errors we don't really want the full stacktrace - LOGGER.debug("Dequeued event item (waiting) {} {}", this, event instanceof Throwable ? event.toString() : event); - semaphore.release(); - } else { - LOGGER.trace("Dequeue timeout (null) from queue {}", System.identityHashCode(this)); - } - return item; - } catch (InterruptedException e) { - LOGGER.debug("Interrupted dequeue (waiting) {}", this); - Thread.currentThread().interrupt(); - return null; - } - } finally { - signalQueuePollerStarted(); - } - } + @Nullable + public abstract EventQueueItem dequeueEventItem(int waitMilliSeconds) throws EventQueueClosedException; /** * Placeholder method for task completion notification. @@ -295,6 +256,17 @@ public void taskDone() { // TODO Not sure if needed yet. BlockingQueue.poll()/.take() remove the events. } + /** + * Returns the current size of the queue. + *+ * For MainQueue: returns the size of the MainEventBus queue (events pending persistence/distribution). + * For ChildQueue: returns the size of the local consumption queue. + *
+ * + * @return the number of events currently in the queue + */ + public abstract int size(); + /** * Closes this event queue gracefully, allowing pending events to be consumed. */ @@ -348,72 +320,63 @@ protected void doClose(boolean immediate) { LOGGER.debug("Closing {} (immediate={})", this, immediate); closed = true; } - - if (immediate) { - // Immediate close: clear pending events - queue.clear(); - LOGGER.debug("Cleared queue for immediate close: {}", this); - } - // For graceful close, let the queue drain naturally through normal consumption + // Subclasses handle immediate close logic (e.g., ChildQueue clears its local queue) } static class MainQueue extends EventQueue { private final List+ * This processor runs in a dedicated background thread, consuming events from the MainEventBus + * and performing two critical operations in order: + *
+ *+ * This architecture ensures clients never receive events before they're persisted, + * eliminating race conditions and enabling reliable event replay. + *
+ *+ * Note: This bean is eagerly initialized by {@link MainEventBusProcessorInitializer} + * to ensure the background thread starts automatically when the application starts. + *
+ */ +@ApplicationScoped +public class MainEventBusProcessor implements Runnable { + private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessor.class); + + /** + * Callback for testing synchronization with async event processing. + * Default is NOOP to avoid null checks in production code. + * Tests can inject their own callback via setCallback(). + */ + private volatile MainEventBusProcessorCallback callback = MainEventBusProcessorCallback.NOOP; + + /** + * Optional executor for push notifications. + * If null, uses default ForkJoinPool (async). + * Tests can inject a synchronous executor to ensure deterministic ordering. + */ + private volatile @Nullable java.util.concurrent.Executor pushNotificationExecutor = null; + + private final MainEventBus eventBus; + + private final TaskStore taskStore; + + private final PushNotificationSender pushSender; + + private volatile boolean running = true; + private @Nullable Thread processorThread; + + @Inject + public MainEventBusProcessor(MainEventBus eventBus, TaskStore taskStore, PushNotificationSender pushSender) { + this.eventBus = eventBus; + this.taskStore = taskStore; + this.pushSender = pushSender; + } + + /** + * Set a callback for testing synchronization with async event processing. + *+ * This is primarily intended for tests that need to wait for event processing to complete. + * Pass null to reset to the default NOOP callback. + *
+ * + * @param callback the callback to invoke during event processing, or null for NOOP + */ + public void setCallback(MainEventBusProcessorCallback callback) { + this.callback = callback != null ? callback : MainEventBusProcessorCallback.NOOP; + } + + /** + * Set a custom executor for push notifications (primarily for testing). + *+ * By default, push notifications are sent asynchronously using CompletableFuture.runAsync() + * with the default ForkJoinPool. For tests that need deterministic ordering of push + * notifications, inject a synchronous executor that runs tasks immediately on the calling thread. + *
+ *+ * Example synchronous executor for tests: + *
{@code
+ * Executor syncExecutor = Runnable::run;
+ * mainEventBusProcessor.setPushNotificationExecutor(syncExecutor);
+ * }
+ *
+ *
+ * @param executor the executor to use for push notifications, or null to use default ForkJoinPool
+ */
+ public void setPushNotificationExecutor(java.util.concurrent.Executor executor) {
+ this.pushNotificationExecutor = executor;
+ }
+
+ @PostConstruct
+ void start() {
+ processorThread = new Thread(this, "MainEventBusProcessor");
+ processorThread.setDaemon(true); // Allow JVM to exit even if this thread is running
+ processorThread.start();
+ LOGGER.info("MainEventBusProcessor started");
+ }
+
+ /**
+ * No-op method to force CDI proxy resolution and ensure @PostConstruct has been called.
+ * Called by MainEventBusProcessorInitializer during application startup.
+ */
+ public void ensureStarted() {
+ // Method intentionally empty - just forces proxy resolution
+ }
+
+ @PreDestroy
+ void stop() {
+ LOGGER.info("MainEventBusProcessor stopping...");
+ running = false;
+ if (processorThread != null) {
+ processorThread.interrupt();
+ try {
+ long start = System.currentTimeMillis();
+ processorThread.join(5000); // Wait up to 5 seconds
+ long elapsed = System.currentTimeMillis() - start;
+ LOGGER.info("MainEventBusProcessor thread stopped in {}ms", elapsed);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted while waiting for MainEventBusProcessor thread to stop");
+ }
+ }
+ LOGGER.info("MainEventBusProcessor stopped");
+ }
+
+ @Override
+ public void run() {
+ LOGGER.info("MainEventBusProcessor processing loop started");
+ while (running) {
+ try {
+ LOGGER.debug("MainEventBusProcessor: Waiting for event from MainEventBus...");
+ MainEventBusContext context = eventBus.take();
+ LOGGER.debug("MainEventBusProcessor: Retrieved event for task {} from MainEventBus",
+ context.taskId());
+ processEvent(context);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.info("MainEventBusProcessor interrupted, shutting down");
+ break;
+ } catch (Exception e) {
+ LOGGER.error("Error processing event from MainEventBus", e);
+ // Continue processing despite errors
+ }
+ }
+ LOGGER.info("MainEventBusProcessor processing loop ended");
+ }
+
+ private void processEvent(MainEventBusContext context) {
+ String taskId = context.taskId();
+ Event event = context.eventQueueItem().getEvent();
+ EventQueue eventQueue = context.eventQueue();
+
+ LOGGER.debug("MainEventBusProcessor: Processing event for task {}: {} (queue type: {})",
+ taskId, event.getClass().getSimpleName(), eventQueue.getClass().getSimpleName());
+
+ Event eventToDistribute = null;
+ try {
+ // Step 1: Update TaskStore FIRST (persistence before clients see it)
+ // If this throws, we distribute an error to ensure "persist before client visibility"
+
+ try {
+ updateTaskStore(taskId, event);
+ eventToDistribute = event; // Success - distribute original event
+ } catch (InternalError e) {
+ // Persistence failed - create error event to distribute instead
+ LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
+ String errorMessage = "Failed to persist event: " + e.getMessage();
+ eventToDistribute = e;
+ } catch (Exception e) {
+ LOGGER.error("Failed to persist event for task {}, distributing error to clients", taskId, e);
+ String errorMessage = "Failed to persist event: " + e.getMessage();
+ eventToDistribute = new InternalError(errorMessage);
+ }
+
+ // Step 2: Send push notification AFTER successful persistence
+ if (eventToDistribute == event) {
+ // Capture task state immediately after persistence, before going async
+ // This ensures we send the task as it existed when THIS event was processed,
+ // not whatever state might exist later when the async callback executes
+ Task taskSnapshot = taskStore.get(taskId);
+ if (taskSnapshot != null) {
+ sendPushNotification(taskId, taskSnapshot);
+ } else {
+ LOGGER.warn("Task {} not found in TaskStore after successful persistence, skipping push notification", taskId);
+ }
+ }
+
+ // Step 3: Then distribute to ChildQueues (clients see either event or error AFTER persistence attempt)
+ if (eventToDistribute == null) {
+ LOGGER.error("MainEventBusProcessor: eventToDistribute is NULL for task {} - this should never happen!", taskId);
+ eventToDistribute = new InternalError("Internal error: event processing failed");
+ }
+
+ if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
+ int childCount = mainQueue.getChildCount();
+ LOGGER.debug("MainEventBusProcessor: Distributing {} to {} children for task {}",
+ eventToDistribute.getClass().getSimpleName(), childCount, taskId);
+ // Create new EventQueueItem with the event to distribute (original or error)
+ EventQueueItem itemToDistribute = new LocalEventQueueItem(eventToDistribute);
+ mainQueue.distributeToChildren(itemToDistribute);
+ LOGGER.debug("MainEventBusProcessor: Distributed {} to {} children for task {}",
+ eventToDistribute.getClass().getSimpleName(), childCount, taskId);
+ } else {
+ LOGGER.warn("MainEventBusProcessor: Expected MainQueue but got {} for task {}",
+ eventQueue.getClass().getSimpleName(), taskId);
+ }
+
+ LOGGER.debug("MainEventBusProcessor: Completed processing event for task {}", taskId);
+
+ } finally {
+ try {
+ // Step 4: Notify callback after all processing is complete
+ // Call callback with the distributed event (original or error)
+ if (eventToDistribute != null) {
+ callback.onEventProcessed(taskId, eventToDistribute);
+
+ // Step 5: If this is a final event, notify task finalization
+ // Only for successful persistence (not for errors)
+ if (eventToDistribute == event && isFinalEvent(event)) {
+ callback.onTaskFinalized(taskId);
+ }
+ }
+ } finally {
+ // ALWAYS release semaphore, even if processing fails
+ // Balances the acquire() in MainQueue.enqueueEvent()
+ if (eventQueue instanceof EventQueue.MainQueue mainQueue) {
+ mainQueue.releaseSemaphore();
+ }
+ }
+ }
+ }
+
+ /**
+ * Updates TaskStore using TaskManager.process().
+ * + * Creates a temporary TaskManager instance for this event and delegates to its process() method, + * which handles all event types (Task, TaskStatusUpdateEvent, TaskArtifactUpdateEvent). + * This leverages existing TaskManager logic for status updates, artifact appending, message history, etc. + *
+ *+ * If persistence fails, the exception is propagated to processEvent() which distributes an + * InternalError to clients instead of the original event, ensuring "persist before visibility". + * See Gemini's comment: https://github.com/a2aproject/a2a-java/pull/515#discussion_r2604621833 + *
+ * + * @throws InternalError if persistence fails + */ + private void updateTaskStore(String taskId, Event event) throws InternalError { + try { + // Extract contextId from event (all relevant events have it) + String contextId = extractContextId(event); + + // Create temporary TaskManager instance for this event + TaskManager taskManager = new TaskManager(taskId, contextId, taskStore, null); + + // Use TaskManager.process() - handles all event types with existing logic + taskManager.process(event); + LOGGER.debug("TaskStore updated via TaskManager.process() for task {}: {}", + taskId, event.getClass().getSimpleName()); + } catch (InternalError e) { + LOGGER.error("Error updating TaskStore via TaskManager for task {}", taskId, e); + // Rethrow to prevent distributing unpersisted event to clients + throw e; + } catch (Exception e) { + LOGGER.error("Unexpected error updating TaskStore for task {}", taskId, e); + // Rethrow to prevent distributing unpersisted event to clients + throw new InternalError("TaskStore persistence failed: " + e.getMessage()); + } + } + + /** + * Sends push notification for the task AFTER persistence. + *+ * This is called after updateTaskStore() to ensure the notification contains + * the latest persisted state, avoiding race conditions. + *
+ *+ * CRITICAL: Push notifications are sent asynchronously in the background + * to avoid blocking event distribution to ChildQueues. The 83ms overhead from + * PushNotificationSender.sendNotification() was causing streaming delays. + *
+ *+ * IMPORTANT: The task parameter is a snapshot captured immediately after + * persistence. This ensures we send the task state as it existed when THIS event + * was processed, not whatever state might exist in TaskStore when the async + * callback executes (subsequent events may have already updated the store). + *
+ *+ * NOTE: Tests can inject a synchronous executor via setPushNotificationExecutor() + * to ensure deterministic ordering of push notifications in the test environment. + *
+ * + * @param taskId the task ID + * @param task the task snapshot to send (captured immediately after persistence) + */ + private void sendPushNotification(String taskId, Task task) { + Runnable pushTask = () -> { + try { + if (task != null) { + LOGGER.debug("Sending push notification for task {}", taskId); + pushSender.sendNotification(task); + } else { + LOGGER.debug("Skipping push notification - task snapshot is null for task {}", taskId); + } + } catch (Exception e) { + LOGGER.error("Error sending push notification for task {}", taskId, e); + // Don't rethrow - push notifications are best-effort + } + }; + + // Use custom executor if set (for tests), otherwise use default ForkJoinPool (async) + if (pushNotificationExecutor != null) { + pushNotificationExecutor.execute(pushTask); + } else { + CompletableFuture.runAsync(pushTask); + } + } + + /** + * Extracts contextId from an event. + * Returns null if the event type doesn't have a contextId (e.g., Message). + */ + @Nullable + private String extractContextId(Event event) { + if (event instanceof Task task) { + return task.contextId(); + } else if (event instanceof TaskStatusUpdateEvent statusUpdate) { + return statusUpdate.contextId(); + } else if (event instanceof TaskArtifactUpdateEvent artifactUpdate) { + return artifactUpdate.contextId(); + } + // Message and other events don't have contextId + return null; + } + + /** + * Checks if an event represents a final task state. + * + * @param event the event to check + * @return true if the event represents a final state (COMPLETED, FAILED, CANCELED, REJECTED, UNKNOWN) + */ + private boolean isFinalEvent(Event event) { + if (event instanceof Task task) { + return task.status() != null && task.status().state() != null + && task.status().state().isFinal(); + } else if (event instanceof TaskStatusUpdateEvent statusUpdate) { + return statusUpdate.isFinal(); + } + return false; + } +} diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java new file mode 100644 index 000000000..b0a9adbce --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorCallback.java @@ -0,0 +1,66 @@ +package io.a2a.server.events; + +import io.a2a.spec.Event; + +/** + * Callback interface for MainEventBusProcessor events. + *+ * This interface is primarily intended for testing, allowing tests to synchronize + * with the asynchronous MainEventBusProcessor. Production code should not rely on this. + *
+ * Usage in tests: + *
+ * {@code
+ * @Inject
+ * MainEventBusProcessor processor;
+ *
+ * @BeforeEach
+ * void setUp() {
+ * CountDownLatch latch = new CountDownLatch(3);
+ * processor.setCallback(new MainEventBusProcessorCallback() {
+ * public void onEventProcessed(String taskId, Event event) {
+ * latch.countDown();
+ * }
+ * });
+ * }
+ *
+ * @AfterEach
+ * void tearDown() {
+ * processor.setCallback(null); // Reset to NOOP
+ * }
+ * }
+ *
+ */
+public interface MainEventBusProcessorCallback {
+
+ /**
+ * Called after an event has been fully processed (persisted, notification sent, distributed to children).
+ *
+ * @param taskId the task ID
+ * @param event the event that was processed
+ */
+ void onEventProcessed(String taskId, Event event);
+
+ /**
+ * Called when a task reaches a final state (COMPLETED, FAILED, CANCELED, REJECTED).
+ *
+ * @param taskId the task ID that was finalized
+ */
+ void onTaskFinalized(String taskId);
+
+ /**
+ * No-op implementation that does nothing.
+ * Used as the default callback to avoid null checks.
+ */
+ MainEventBusProcessorCallback NOOP = new MainEventBusProcessorCallback() {
+ @Override
+ public void onEventProcessed(String taskId, Event event) {
+ // No-op
+ }
+
+ @Override
+ public void onTaskFinalized(String taskId) {
+ // No-op
+ }
+ };
+}
diff --git a/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
new file mode 100644
index 000000000..ba4b300be
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/events/MainEventBusProcessorInitializer.java
@@ -0,0 +1,43 @@
+package io.a2a.server.events;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.context.Initialized;
+import jakarta.enterprise.event.Observes;
+import jakarta.inject.Inject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Portable CDI initializer for MainEventBusProcessor.
+ * + * This bean observes the ApplicationScoped initialization event and injects + * MainEventBusProcessor, which triggers its eager creation and starts the background thread. + *
+ *+ * This approach is portable across all Jakarta CDI implementations (Weld, OpenWebBeans, Quarkus, etc.) + * and ensures MainEventBusProcessor starts automatically when the application starts. + *
+ */ +@ApplicationScoped +public class MainEventBusProcessorInitializer { + private static final Logger LOGGER = LoggerFactory.getLogger(MainEventBusProcessorInitializer.class); + + @Inject + MainEventBusProcessor processor; + + /** + * Observes ApplicationScoped initialization to force eager creation of MainEventBusProcessor. + * The injection of MainEventBusProcessor in this bean triggers its creation, and calling + * ensureStarted() forces the CDI proxy to be resolved, which ensures @PostConstruct has been + * called and the background thread is running. + */ + void onStart(@Observes @Initialized(ApplicationScoped.class) Object event) { + if (processor != null) { + // Force proxy resolution to ensure @PostConstruct has been called + processor.ensureStarted(); + LOGGER.info("MainEventBusProcessor initialized and started"); + } else { + LOGGER.error("MainEventBusProcessor is null - initialization failed!"); + } + } +} diff --git a/server-common/src/main/java/io/a2a/server/events/QueueManager.java b/server-common/src/main/java/io/a2a/server/events/QueueManager.java index 01e754fcb..4ad30f0cb 100644 --- a/server-common/src/main/java/io/a2a/server/events/QueueManager.java +++ b/server-common/src/main/java/io/a2a/server/events/QueueManager.java @@ -177,7 +177,31 @@ public interface QueueManager { * @return a builder for creating event queues */ default EventQueue.EventQueueBuilder getEventQueueBuilder(String taskId) { - return EventQueue.builder(); + throw new UnsupportedOperationException( + "QueueManager implementations must override getEventQueueBuilder() to provide MainEventBus" + ); + } + + /** + * Creates a base EventQueueBuilder with standard configuration for this QueueManager. + * This method provides the foundation for creating event queues with proper configuration + * (MainEventBus, TaskStateProvider, cleanup callbacks, etc.). + *+ * QueueManager implementations that use custom factories can call this method directly + * to get the base builder without going through the factory (which could cause infinite + * recursion if the factory delegates back to getEventQueueBuilder()). + *
+ *+ * Callers can then add additional configuration (hooks, callbacks) before building the queue. + *
+ * + * @param taskId the task ID for the queue + * @return a builder with base configuration specific to this QueueManager implementation + */ + default EventQueue.EventQueueBuilder createBaseEventQueueBuilder(String taskId) { + throw new UnsupportedOperationException( + "QueueManager implementations must override createBaseEventQueueBuilder() to provide MainEventBus" + ); } /** diff --git a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java index 002acbafd..38bf3bd86 100644 --- a/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java +++ b/server-common/src/main/java/io/a2a/server/requesthandlers/DefaultRequestHandler.java @@ -11,13 +11,13 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; -import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.Flow; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -35,6 +35,8 @@ import io.a2a.server.events.EventConsumer; import io.a2a.server.events.EventQueue; import io.a2a.server.events.EventQueueItem; +import io.a2a.server.events.MainEventBusProcessor; +import io.a2a.server.events.MainEventBusProcessorCallback; import io.a2a.server.events.QueueManager; import io.a2a.server.events.TaskQueueExistsException; import io.a2a.server.tasks.PushNotificationConfigStore; @@ -42,6 +44,7 @@ import io.a2a.server.tasks.ResultAggregator; import io.a2a.server.tasks.TaskManager; import io.a2a.server.tasks.TaskStore; +import io.a2a.server.util.async.EventConsumerExecutorProducer.EventConsumerExecutor; import io.a2a.server.util.async.Internal; import io.a2a.spec.A2AError; import io.a2a.spec.DeleteTaskPushNotificationConfigParams; @@ -64,6 +67,7 @@ import io.a2a.spec.TaskPushNotificationConfig; import io.a2a.spec.TaskQueryParams; import io.a2a.spec.TaskState; +import io.a2a.spec.TaskStatusUpdateEvent; import io.a2a.spec.UnsupportedOperationError; import org.jspecify.annotations.NonNull; import org.jspecify.annotations.Nullable; @@ -122,7 +126,6 @@ *+ * This single callback multiplexes for all concurrent requests via the + * {@link #pendingFinalizations} map. Called by both {@link #initConfig()} + * (@PostConstruct for CDI) and {@link #create(AgentExecutor, TaskStore, QueueManager, PushNotificationConfigStore, MainEventBusProcessor, Executor, Executor)} + * (static factory for tests). + *
+ */ + private void registerFinalizationCallback() { + mainEventBusProcessor.setCallback(new MainEventBusProcessorCallback() { + @Override + public void onEventProcessed(String taskId, Event event) { + // Not used for task finalization wait + } + + @Override + public void onTaskFinalized(String taskId) { + // Signal any blocking call waiting for this task to finalize + CountDownLatch latch = pendingFinalizations.get(taskId); + if (latch != null) { + latch.countDown(); + pendingFinalizations.remove(taskId); + LOGGER.debug("Task {} finalization signaled to waiting thread", taskId); + } + } + }); + } + + /** + * Wait for MainEventBusProcessor to finalize a task (reach final state and persist to TaskStore). + *+ * This method is used by blocking calls to ensure TaskStore is fully updated before returning + * to the client. It registers this task in the {@link #pendingFinalizations} map, which is + * monitored by the permanent callback registered in {@link #initConfig()}. + *
+ *+ * Why this is needed: Events flow through MainEventBus → MainEventBusProcessor → + * TaskStore persistence. The consumption future completing only means ChildQueue is empty, + * NOT that MainEventBusProcessor has finished persisting to TaskStore. This creates a race + * condition where blocking calls might read stale state from TaskStore. + *
+ *+ * Concurrency: Uses a single permanent callback that multiplexes for all concurrent + * requests via {@link #pendingFinalizations} map. This avoids callback overwrite issues + * when multiple requests execute simultaneously. + *
+ *+ * Race Condition Prevention: Checks TaskStore FIRST before waiting. If the task is + * already finalized, returns immediately. This prevents waiting forever when the callback + * fires before the latch is registered in the map. + *
+ * + * @param taskId the task ID to wait for + * @param timeoutSeconds maximum time to wait for finalization + * @throws InterruptedException if interrupted while waiting + * @throws TimeoutException if task doesn't finalize within timeout + */ + private void waitForTaskFinalization(String taskId, int timeoutSeconds) + throws InterruptedException, TimeoutException { + // CRITICAL: Check TaskStore FIRST to avoid race condition where callback fires + // before latch is registered. If task is finalized, return immediately. + Task task = taskStore.get(taskId); + if (task != null && task.status() != null && task.status().state() != null + && task.status().state().isFinal()) { + LOGGER.debug("Task {} is finalized in TaskStore, skipping wait", taskId); + return; + } + + CountDownLatch finalizationLatch = new CountDownLatch(1); + + // Register this task's latch in the map + // The permanent callback (registered in initConfig) will signal it + pendingFinalizations.put(taskId, finalizationLatch); + + try { + // Wait for the callback to fire + if (!finalizationLatch.await(timeoutSeconds, SECONDS)) { + throw new TimeoutException( + String.format("Task %s finalization timeout after %d seconds", taskId, timeoutSeconds)); + } + LOGGER.debug("Task {} finalized and persisted to TaskStore", taskId); + } finally { + // Always remove from map to avoid memory leaks + pendingFinalizations.remove(taskId); + } } /** @@ -269,11 +380,17 @@ void initConfig() { */ public static DefaultRequestHandler create(AgentExecutor agentExecutor, TaskStore taskStore, QueueManager queueManager, PushNotificationConfigStore pushConfigStore, - PushNotificationSender pushSender, Executor executor) { + MainEventBusProcessor mainEventBusProcessor, + Executor executor, Executor eventConsumerExecutor) { DefaultRequestHandler handler = - new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, pushSender, executor); + new DefaultRequestHandler(agentExecutor, taskStore, queueManager, pushConfigStore, + mainEventBusProcessor, executor, eventConsumerExecutor); handler.agentCompletionTimeoutSeconds = 5; handler.consumptionCompletionTimeoutSeconds = 2; + + // Register permanent callback for task finalization (normally done in @PostConstruct) + handler.registerFinalizationCallback(); + return handler; } @@ -359,12 +476,9 @@ public Task onCancelTask(TaskIdParams params, ServerCallContext context) throws taskStore, null); - ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor); + ResultAggregator resultAggregator = new ResultAggregator(taskManager, null, executor, eventConsumerExecutor); - EventQueue queue = queueManager.tap(task.id()); - if (queue == null) { - queue = queueManager.getEventQueueBuilder(task.id()).build(); - } + EventQueue queue = queueManager.createOrTap(task.id()); agentExecutor.cancel( requestContextBuilder.get() .setTaskId(task.id()) @@ -404,19 +518,28 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte throw new io.a2a.spec.InternalError("Task ID is null in onMessageSend"); } EventQueue queue = queueManager.createOrTap(taskId); - ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor); + ResultAggregator resultAggregator = new ResultAggregator(mss.taskManager, null, executor, eventConsumerExecutor); + // Default to blocking=false per A2A spec (return after task creation) boolean blocking = params.configuration() != null && Boolean.TRUE.equals(params.configuration().blocking()); + // Log blocking behavior from client request + if (params.configuration() != null && params.configuration().blocking() != null) { + LOGGER.debug("DefaultRequestHandler: Client requested blocking={} for task {}", + params.configuration().blocking(), taskId); + } else if (params.configuration() != null) { + LOGGER.debug("DefaultRequestHandler: Client sent configuration but blocking=null, using default blocking={} for task {}", blocking, taskId); + } else { + LOGGER.debug("DefaultRequestHandler: Client sent no configuration, using default blocking={} for task {}", blocking, taskId); + } + LOGGER.debug("DefaultRequestHandler: Final blocking decision: {} for task {}", blocking, taskId); + boolean interruptedOrNonBlocking = false; EnhancedRunnable producerRunnable = registerAndExecuteAgentAsync(taskId, mss.requestContext, queue); ResultAggregator.EventTypeAndInterrupt etai = null; EventKind kind = null; // Declare outside try block so it's in scope for return try { - // Create callback for push notifications during background event processing - Runnable pushNotificationCallback = () -> sendPushNotification(taskId, resultAggregator); - EventConsumer consumer = new EventConsumer(queue); // This callback must be added before we start consuming. Otherwise, @@ -432,7 +555,8 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte throw new InternalError("No result"); } interruptedOrNonBlocking = etai.interrupted(); - LOGGER.debug("Was interrupted or non-blocking: {}", interruptedOrNonBlocking); + LOGGER.debug("DefaultRequestHandler: interruptedOrNonBlocking={} (blocking={}, eventType={})", + interruptedOrNonBlocking, blocking, kind != null ? kind.getClass().getSimpleName() : null); // For blocking calls that were interrupted (returned on first event), // wait for agent execution and event processing BEFORE returning to client. @@ -444,27 +568,31 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // Store push notification config for newly created tasks (mirrors streaming logic) // Only for NEW tasks - existing tasks are handled by initMessageSend() if (mss.task() == null && kind instanceof Task createdTask && shouldAddPushInfo(params)) { - LOGGER.debug("Storing push notification config for new task {}", createdTask.id()); + LOGGER.debug("Storing push notification config for new task {} (original taskId from params: {})", + createdTask.id(), params.message().taskId()); pushConfigStore.setInfo(createdTask.id(), params.configuration().pushNotificationConfig()); } if (blocking && interruptedOrNonBlocking) { - // For blocking calls: ensure all events are processed before returning - // Order of operations is critical to avoid circular dependency: + // For blocking calls: ensure all events are persisted to TaskStore before returning + // Order of operations is critical to avoid circular dependency and race conditions: // 1. Wait for agent to finish enqueueing events // 2. Close the queue to signal consumption can complete // 3. Wait for consumption to finish processing events - // 4. Fetch final task state from TaskStore + // 4. Wait for MainEventBusProcessor to persist final state to TaskStore + // 5. Fetch final task state from TaskStore (now guaranteed persisted) + LOGGER.debug("DefaultRequestHandler: Entering blocking fire-and-forget handling for task {}", taskId); try { // Step 1: Wait for agent to finish (with configurable timeout) if (agentFuture != null) { try { agentFuture.get(agentCompletionTimeoutSeconds, SECONDS); - LOGGER.debug("Agent completed for task {}", taskId); + LOGGER.debug("DefaultRequestHandler: Step 1 - Agent completed for task {}", taskId); } catch (java.util.concurrent.TimeoutException e) { // Agent still running after timeout - that's fine, events already being processed - LOGGER.debug("Agent still running for task {} after {}s", taskId, agentCompletionTimeoutSeconds); + LOGGER.debug("DefaultRequestHandler: Step 1 - Agent still running for task {} after {}s timeout", + taskId, agentCompletionTimeoutSeconds); } } @@ -472,13 +600,27 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte // For fire-and-forget tasks, there's no final event, so we need to close the queue // This allows EventConsumer.consumeAll() to exit queue.close(false, false); // graceful close, don't notify parent yet - LOGGER.debug("Closed queue for task {} to allow consumption completion", taskId); + LOGGER.debug("DefaultRequestHandler: Step 2 - Closed queue for task {} to allow consumption completion", taskId); // Step 3: Wait for consumption to complete (now that queue is closed) if (etai.consumptionFuture() != null) { etai.consumptionFuture().get(consumptionCompletionTimeoutSeconds, SECONDS); - LOGGER.debug("Consumption completed for task {}", taskId); + LOGGER.debug("DefaultRequestHandler: Step 3 - Consumption completed for task {}", taskId); } + + // Step 4: Wait for MainEventBusProcessor to finalize task (persist to TaskStore) + // ONLY if the event is a final state. For fire-and-forget tasks (non-final states), + // we don't wait for finalization since it will never happen. + if (isFinalEvent(kind)) { + // This is CRITICAL: consumption completing only means ChildQueue is empty, NOT that + // MainEventBusProcessor has finished persisting to TaskStore. The callback ensures + // we wait for the final state to be persisted before reading from TaskStore. + waitForTaskFinalization(taskId, consumptionCompletionTimeoutSeconds); + LOGGER.debug("DefaultRequestHandler: Step 4 - Task {} finalized and persisted to TaskStore", taskId); + } else { + LOGGER.debug("DefaultRequestHandler: Step 4 - Skipping finalization wait for task {} (non-final event)", taskId); + } + } catch (InterruptedException e) { Thread.currentThread().interrupt(); String msg = String.format("Error waiting for task %s completion", taskId); @@ -488,39 +630,51 @@ public EventKind onMessageSend(MessageSendParams params, ServerCallContext conte String msg = String.format("Error during task %s execution", taskId); LOGGER.warn(msg, e.getCause()); throw new InternalError(msg); - } catch (java.util.concurrent.TimeoutException e) { - String msg = String.format("Timeout waiting for consumption to complete for task %s", taskId); - LOGGER.warn(msg, taskId); + } catch (TimeoutException e) { + String msg = String.format("Timeout waiting for task %s finalization", taskId); + LOGGER.warn(msg, e); throw new InternalError(msg); } - // Step 4: Fetch the final task state from TaskStore (all events have been processed) + // Step 5: Fetch the final task state from TaskStore (now guaranteed persisted) // taskId is guaranteed non-null here (checked earlier) String nonNullTaskId = taskId; Task updatedTask = taskStore.get(nonNullTaskId); if (updatedTask != null) { kind = updatedTask; - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Fetched final task for {} with state {} and {} artifacts", - nonNullTaskId, updatedTask.status().state(), - updatedTask.artifacts().size()); - } + LOGGER.debug("DefaultRequestHandler: Step 5 - Fetched final task for {} with state {} and {} artifacts", + taskId, updatedTask.status().state(), + updatedTask.artifacts().size()); + } else { + LOGGER.warn("DefaultRequestHandler: Step 5 - Task {} not found in TaskStore!", taskId); } } if (kind instanceof Task taskResult && !taskId.equals(taskResult.id())) { throw new InternalError("Task ID mismatch in agent response"); } - - // Send push notification after initial return (for both blocking and non-blocking) - pushNotificationCallback.run(); } finally { + // For non-blocking calls: close ChildQueue IMMEDIATELY to free EventConsumer thread + // CRITICAL: Must use immediate=true to clear the local queue, otherwise EventConsumer + // continues polling until queue drains naturally, holding executor thread. + // Immediate close clears pending events and triggers EventQueueClosedException on next poll. + // Events continue flowing through MainQueue → MainEventBus → TaskStore. + if (!blocking && etai != null && etai.interrupted()) { + LOGGER.debug("DefaultRequestHandler: Non-blocking call in finally - closing ChildQueue IMMEDIATELY for task {} to free EventConsumer", taskId); + queue.close(true); // immediate=true: clear queue and free EventConsumer + } + // Remove agent from map immediately to prevent accumulation CompletableFuture
+ * Property: {@code a2a.executor.queue-capacity}
+ * Default: 100
+ * Note: Must be bounded to allow pool growth to maxPoolSize.
+ * When queue is full, new threads are created up to maxPoolSize.
+ */
+ int queueCapacity;
+
private @Nullable ExecutorService executor;
@PostConstruct
@@ -64,18 +75,34 @@ public void init() {
corePoolSize = Integer.parseInt(configProvider.getValue(A2A_EXECUTOR_CORE_POOL_SIZE));
maxPoolSize = Integer.parseInt(configProvider.getValue(A2A_EXECUTOR_MAX_POOL_SIZE));
keepAliveSeconds = Long.parseLong(configProvider.getValue(A2A_EXECUTOR_KEEP_ALIVE_SECONDS));
-
- LOGGER.info("Initializing async executor: corePoolSize={}, maxPoolSize={}, keepAliveSeconds={}",
- corePoolSize, maxPoolSize, keepAliveSeconds);
-
- executor = new ThreadPoolExecutor(
+ queueCapacity = Integer.parseInt(configProvider.getValue(A2A_EXECUTOR_QUEUE_CAPACITY));
+
+ LOGGER.info("Initializing async executor: corePoolSize={}, maxPoolSize={}, keepAliveSeconds={}, queueCapacity={}",
+ corePoolSize, maxPoolSize, keepAliveSeconds, queueCapacity);
+
+ // CRITICAL: Use ArrayBlockingQueue (bounded) instead of LinkedBlockingQueue (unbounded).
+ // With unbounded queue, ThreadPoolExecutor NEVER grows beyond corePoolSize because the
+ // queue never fills. This causes executor pool exhaustion during concurrent requests when
+ // EventConsumer polling threads hold all core threads and agent tasks queue indefinitely.
+ // Bounded queue enables pool growth: when queue is full, new threads are created up to
+ // maxPoolSize, preventing agent execution starvation.
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveSeconds,
TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
+ new ArrayBlockingQueue<>(queueCapacity),
new A2AThreadFactory()
);
+
+ // CRITICAL: Allow core threads to timeout after keepAliveSeconds when idle.
+ // By default, ThreadPoolExecutor only times out threads above corePoolSize.
+ // Without this, core threads accumulate during testing and never clean up.
+ // This is essential for streaming scenarios where many short-lived tasks create threads
+ // for agent execution and cleanup callbacks, but those threads remain idle afterward.
+ tpe.allowCoreThreadTimeOut(true);
+
+ executor = tpe;
}
@PreDestroy
@@ -106,6 +133,22 @@ public Executor produce() {
return executor;
}
+ /**
+ * Log current executor pool statistics for diagnostics.
+ * Useful for debugging pool exhaustion or sizing issues.
+ */
+ public void logPoolStats() {
+ if (executor instanceof ThreadPoolExecutor tpe) {
+ LOGGER.info("Executor pool stats: active={}/{}, queued={}/{}, completed={}, total={}",
+ tpe.getActiveCount(),
+ tpe.getPoolSize(),
+ tpe.getQueue().size(),
+ queueCapacity,
+ tpe.getCompletedTaskCount(),
+ tpe.getTaskCount());
+ }
+ }
+
private static class A2AThreadFactory implements ThreadFactory {
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix = "a2a-agent-executor-";
diff --git a/server-common/src/main/java/io/a2a/server/util/async/EventConsumerExecutorProducer.java b/server-common/src/main/java/io/a2a/server/util/async/EventConsumerExecutorProducer.java
new file mode 100644
index 000000000..24ff7f5d1
--- /dev/null
+++ b/server-common/src/main/java/io/a2a/server/util/async/EventConsumerExecutorProducer.java
@@ -0,0 +1,93 @@
+package io.a2a.server.util.async;
+
+import jakarta.enterprise.context.ApplicationScoped;
+import jakarta.enterprise.inject.Produces;
+import jakarta.inject.Qualifier;
+
+import org.jspecify.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.annotation.Retention;
+import java.lang.annotation.Target;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static java.lang.annotation.ElementType.*;
+import static java.lang.annotation.RetentionPolicy.RUNTIME;
+
+/**
+ * Produces a dedicated executor for EventConsumer polling threads.
+ *
+ * CRITICAL: EventConsumer polling must use a separate executor from AgentExecutor because: + *
+ * Uses a cached thread pool (unbounded) with automatic thread reclamation: + *
+ * Provides static methods to serialize A2A responses to JSON and format them as SSE events. + * This allows HTTP server frameworks (Vert.x, Jakarta/WildFly, etc.) to use their own + * reactive libraries for publisher mapping while sharing the serialization logic. + *
+ * Example usage (Quarkus/Vert.x with Mutiny): + *
{@code
+ * Flow.Publisher> responses = handler.onMessageSendStream(request, context);
+ * AtomicLong eventId = new AtomicLong(0);
+ *
+ * Multi sseEvents = Multi.createFrom().publisher(responses)
+ * .map(response -> SseFormatter.formatResponseAsSSE(response, eventId.getAndIncrement()));
+ *
+ * sseEvents.subscribe().with(sseEvent -> httpResponse.write(Buffer.buffer(sseEvent)));
+ * }
+ * + * Example usage (Jakarta/WildFly with custom reactive library): + *
{@code
+ * Flow.Publisher jsonStrings = restHandler.getJsonPublisher();
+ * AtomicLong eventId = new AtomicLong(0);
+ *
+ * Flow.Publisher sseEvents = mapPublisher(jsonStrings,
+ * json -> SseFormatter.formatJsonAsSSE(json, eventId.getAndIncrement()));
+ * }
+ */
+public class SseFormatter {
+
+ private SseFormatter() {
+ // Utility class - prevent instantiation
+ }
+
+ /**
+ * Format an A2A response as an SSE event.
+ * + * Serializes the response to JSON and formats as: + *
+ * data: {"jsonrpc":"2.0","result":{...},"id":123}
+ * id: 0
+ *
+ *
+ *
+ * @param response the A2A response to format
+ * @param eventId the SSE event ID
+ * @return SSE-formatted string (ready to write to HTTP response)
+ */
+ public static String formatResponseAsSSE(A2AResponse> response, long eventId) {
+ String jsonData = serializeResponse(response);
+ return "data: " + jsonData + "\nid: " + eventId + "\n\n";
+ }
+
+ /**
+ * Format a pre-serialized JSON string as an SSE event.
+ * + * Wraps the JSON in SSE format as: + *
+ * data: {"jsonrpc":"2.0","result":{...},"id":123}
+ * id: 0
+ *
+ *
+ * + * Use this when you already have JSON strings (e.g., from REST transport) + * and just need to add SSE formatting. + * + * @param jsonString the JSON string to wrap + * @param eventId the SSE event ID + * @return SSE-formatted string (ready to write to HTTP response) + */ + public static String formatJsonAsSSE(String jsonString, long eventId) { + return "data: " + jsonString + "\nid: " + eventId + "\n\n"; + } + + /** + * Serialize an A2AResponse to JSON string. + */ + private static String serializeResponse(A2AResponse> response) { + // For error responses, use standard JSON-RPC error format + if (response instanceof A2AErrorResponse error) { + return JSONRPCUtils.toJsonRPCErrorResponse(error.getId(), error.getError()); + } + if (response.getError() != null) { + return JSONRPCUtils.toJsonRPCErrorResponse(response.getId(), response.getError()); + } + + // Convert domain response to protobuf message and serialize + com.google.protobuf.MessageOrBuilder protoMessage = convertToProto(response); + return JSONRPCUtils.toJsonRPCResultResponse(response.getId(), protoMessage); + } + + /** + * Convert A2AResponse to protobuf message for serialization. + */ + private static com.google.protobuf.MessageOrBuilder convertToProto(A2AResponse> response) { + if (response instanceof GetTaskResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.task(r.getResult()); + } else if (response instanceof CancelTaskResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.task(r.getResult()); + } else if (response instanceof SendMessageResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.taskOrMessage(r.getResult()); + } else if (response instanceof ListTasksResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.listTasksResult(r.getResult()); + } else if (response instanceof SetTaskPushNotificationConfigResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.setTaskPushNotificationConfigResponse(r.getResult()); + } else if (response instanceof GetTaskPushNotificationConfigResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.getTaskPushNotificationConfigResponse(r.getResult()); + } else if (response instanceof ListTaskPushNotificationConfigResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.listTaskPushNotificationConfigResponse(r.getResult()); + } else if (response instanceof DeleteTaskPushNotificationConfigResponse) { + // DeleteTaskPushNotificationConfig has no result body, just return empty message + return com.google.protobuf.Empty.getDefaultInstance(); + } else if (response instanceof GetExtendedAgentCardResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.getExtendedCardResponse(r.getResult()); + } else if (response instanceof SendStreamingMessageResponse r) { + return io.a2a.grpc.utils.ProtoUtils.ToProto.taskOrMessageStream(r.getResult()); + } else { + throw new IllegalArgumentException("Unknown response type: " + response.getClass().getName()); + } + } +} diff --git a/server-common/src/main/java/io/a2a/server/util/sse/package-info.java b/server-common/src/main/java/io/a2a/server/util/sse/package-info.java new file mode 100644 index 000000000..7e668b632 --- /dev/null +++ b/server-common/src/main/java/io/a2a/server/util/sse/package-info.java @@ -0,0 +1,11 @@ +/** + * Server-Sent Events (SSE) formatting utilities for A2A streaming responses. + *
+ * Provides framework-agnostic conversion of {@code Flow.Publisher
+ * Note: Returns MainQueue - tests should call .tap() if they need to consume events.
+ *
+ * Note: Does NOT set callbacks - DefaultRequestHandler has a permanent callback.
+ * Simply polls TaskStore until task reaches final state.
+ *