diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java index d0e24113..161bb5cf 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/error/DlqProductionExceptionHandlerTest.java @@ -34,6 +34,7 @@ import org.apache.kafka.clients.consumer.RetriableCommitFailedException; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.streams.errors.ErrorHandlerContext; import org.apache.kafka.streams.errors.ProductionExceptionHandler; @@ -139,4 +140,43 @@ void shouldReturnRetryOnRetriableException() { assertEquals(ProductionExceptionHandler.Result.RETRY, response.result()); assertTrue(response.deadLetterQueueRecords().isEmpty()); } + + @Test + void shouldSendSerializationExceptionToDlq() { + DlqProductionExceptionHandler handler = new DlqProductionExceptionHandler(); + KafkaStreamsExecutionContext.setDlqTopicName("DLQ_TOPIC"); + handler.configure(Map.of()); + + when(producerRecord.key()).thenReturn("key".getBytes(StandardCharsets.UTF_8)); + when(producerRecord.value()).thenReturn("value".getBytes(StandardCharsets.UTF_8)); + when(producerRecord.topic()).thenReturn("topic"); + when(errorHandlerContext.taskId()).thenReturn(new TaskId(0, 0)); + when(errorHandlerContext.partition()).thenReturn(0); + + ProductionExceptionHandler.Response response = + handler.handleError(errorHandlerContext, producerRecord, new SerializationException("Serialization failed")); + + assertEquals(ProductionExceptionHandler.Result.RESUME, response.result()); + assertEquals(1, response.deadLetterQueueRecords().size()); + + Serde serde = SerdesUtils.getValueSerdes(); + KafkaError kafkaError = serde.deserializer() + .deserialize( + "DLQ_TOPIC", response.deadLetterQueueRecords().get(0).value()); + + assertEquals( + "An exception occurred during the stream internal production. Please find more details about the exception in the cause and stack fields.", + kafkaError.getContextMessage()); + assertEquals(0, kafkaError.getOffset()); + assertEquals(0, kafkaError.getPartition()); + assertEquals("topic", kafkaError.getTopic()); + assertEquals("test-app", kafkaError.getApplicationId()); + assertNull(kafkaError.getProcessorNodeId()); + assertEquals("0_0", kafkaError.getTaskId()); + assertEquals("Unknown cause", kafkaError.getCause()); + assertTrue(kafkaError.getStack().contains("org.apache.kafka.common.errors.SerializationException: Serialization failed")); + assertEquals("value", new String(kafkaError.getByteValue().array(), StandardCharsets.UTF_8)); + assertNull(kafkaError.getValue()); + assertEquals("key", new String(response.deadLetterQueueRecords().get(0).key(), StandardCharsets.UTF_8)); + } }