diff --git a/gradle.properties b/gradle.properties index 8f2fe564f84..5e81a017000 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ version=5.5.3-SNAPSHOT -org.gradle.jvmargs=-Xms512m -Xmx4g -XX:MaxPermSize=1024m -XX:MaxMetaspaceSize=1g -Dkotlin.daemon.jvm.options="-Xmx1g" -Dfile.encoding=UTF-8 +org.gradle.jvmargs=-Xms512m -Xmx4g -XX:MaxPermSize=1024m -XX:MaxMetaspaceSize=1g -Dkotlin.daemon.jvm.options="-Xmx1g --illegal-access=permit" -Dfile.encoding=UTF-8 org.gradle.caching=true org.gradle.parallel=true kotlin.stdlib.default.dependency=false diff --git a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java index d41d748af09..01c3637bd6a 100644 --- a/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java +++ b/spring-integration-kafka/src/main/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2013-2020 the original author or authors. + * Copyright 2013-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -67,7 +67,6 @@ import org.springframework.messaging.MessageChannel; import org.springframework.messaging.MessageHandlingException; import org.springframework.messaging.MessageHeaders; -import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import org.springframework.util.concurrent.ListenableFuture; @@ -491,9 +490,7 @@ protected Object handleRequestMessage(final Message message) { sendFuture = gatewayFuture.getSendFuture(); } else { - if (this.transactional - && TransactionSynchronizationManager.getResource(this.kafkaTemplate.getProducerFactory()) == null - && !this.allowNonTransactional) { + if (this.transactional && !this.kafkaTemplate.inTransaction() && !this.allowNonTransactional) { sendFuture = this.kafkaTemplate.executeInTransaction(template -> template.send(producerRecord)); } else { diff --git a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java index 83dca01a193..347bb320f3d 100644 --- a/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java +++ b/spring-integration-kafka/src/test/java/org/springframework/integration/kafka/outbound/KafkaProducerMessageHandlerTests.java @@ -1,5 +1,5 @@ /* - * Copyright 2016-2020 the original author or authors. + * Copyright 2016-2021 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -100,6 +100,11 @@ import org.springframework.messaging.support.ErrorMessage; import org.springframework.messaging.support.GenericMessage; import org.springframework.transaction.PlatformTransactionManager; +import org.springframework.transaction.TransactionDefinition; +import org.springframework.transaction.TransactionException; +import org.springframework.transaction.support.AbstractPlatformTransactionManager; +import org.springframework.transaction.support.DefaultTransactionStatus; +import org.springframework.transaction.support.TransactionTemplate; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.SettableListenableFuture; @@ -598,6 +603,37 @@ protected Producer createTransactionalProducer(String txIdPrefix) { assertThat(txId.get()).isEqualTo("overridden.tx.id."); } + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + void testTransactionSynch() { + Producer producer = mock(Producer.class); + ProducerFactory pf = mock(ProducerFactory.class); + given(pf.transactionCapable()).willReturn(true); + given(pf.createProducer(isNull())).willReturn(producer); + ListenableFuture future = mock(ListenableFuture.class); + willReturn(future).given(producer).send(any(ProducerRecord.class), any(Callback.class)); + KafkaTemplate template = new KafkaTemplate(pf); + KafkaProducerMessageHandler handler = new KafkaProducerMessageHandler(template); + handler.setTopicExpression(new LiteralExpression("bar")); + handler.setBeanFactory(mock(BeanFactory.class)); + handler.afterPropertiesSet(); + handler.start(); + try { + new TransactionTemplate(new SomeOtherTransactionManager()).executeWithoutResult(status -> { + handler.handleMessage(new GenericMessage<>("foo")); + throw new IllegalStateException("test"); + }); + } + catch (IllegalStateException ex) { + } + handler.stop(); + verify(producer).beginTransaction(); + verify(producer).send(any(ProducerRecord.class), any(Callback.class)); + verify(producer).abortTransaction(); + verify(producer).close(any()); + verifyNoMoreInteractions(producer); + } + @SuppressWarnings({ "unchecked", "rawtypes" }) @Test void testConsumeAndProduceTransactionTxIdOverride() throws Exception { @@ -747,4 +783,26 @@ void testNoFlush() { handler.stop(); } + @SuppressWarnings("serial") + static class SomeOtherTransactionManager extends AbstractPlatformTransactionManager { + + @Override + protected Object doGetTransaction() throws TransactionException { + return new Object(); + } + + @Override + protected void doBegin(Object transaction, TransactionDefinition definition) throws TransactionException { + } + + @Override + protected void doCommit(DefaultTransactionStatus status) throws TransactionException { + } + + @Override + protected void doRollback(DefaultTransactionStatus status) throws TransactionException { + } + + } + }