From 73b292d8ac844361f684ab998643389698649c30 Mon Sep 17 00:00:00 2001 From: Gary Russell Date: Mon, 17 Jul 2017 14:37:59 -0400 Subject: [PATCH] Kafka: Support Arbitrary Producer/Consumer Props PR #7672 Added support for arbitrary common properties. However, Kafka emits a warning if a producer configuration contains properties intended only for consumers, and vice versa. The documentation showed a sample of how to write code to configure arbitrary properties but this is inconvenient. Add arbitrary properties to the consumer and procucer configs. --- .../autoconfigure/kafka/KafkaProperties.java | 31 ++++- .../kafka/KafkaAutoConfigurationTests.java | 17 ++- .../appendix-application-properties.adoc | 4 +- .../main/asciidoc/spring-boot-features.adoc | 15 +-- ...aSpecialProducerConsumerConfigExample.java | 124 ------------------ 5 files changed, 47 insertions(+), 144 deletions(-) delete mode 100644 spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java diff --git a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java index 51a8fffe6e26..e2e10714a23f 100644 --- a/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java +++ b/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaProperties.java @@ -63,7 +63,8 @@ public class KafkaProperties { private String clientId; /** - * Additional properties used to configure the client. + * Additional properties, common to producers and consumers, used to configure the + * client. */ private Map properties = new HashMap<>(); @@ -268,6 +269,11 @@ public static class Consumer { */ private Integer maxPollRecords; + /** + * Additional properties used to configure the client. + */ + private Map properties = new HashMap<>(); + public Ssl getSsl() { return this.ssl; } @@ -368,6 +374,14 @@ public void setMaxPollRecords(Integer maxPollRecords) { this.maxPollRecords = maxPollRecords; } + public Map getProperties() { + return this.properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + public Map buildProperties() { Map properties = new HashMap<>(); if (this.autoCommitInterval != null) { @@ -435,6 +449,7 @@ public Map buildProperties() { properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords); } + properties.putAll(this.properties); return properties; } @@ -492,6 +507,11 @@ public static class Producer { */ private Integer retries; + /** + * Additional properties used to configure the client. + */ + private Map properties = new HashMap<>(); + public Ssl getSsl() { return this.ssl; } @@ -568,6 +588,14 @@ public void setRetries(Integer retries) { this.retries = retries; } + public Map getProperties() { + return this.properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + public Map buildProperties() { Map properties = new HashMap<>(); if (this.acks != null) { @@ -621,6 +649,7 @@ public Map buildProperties() { properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, this.valueSerializer); } + properties.putAll(this.properties); return properties; } diff --git a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java index c20e7dde585b..d601c1c59028 100644 --- a/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java +++ b/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java @@ -78,6 +78,7 @@ public void consumerProperties() { "spring.kafka.consumer.client-id=ccid", // test override common "spring.kafka.consumer.enable-auto-commit=false", "spring.kafka.consumer.fetch-max-wait=456", + "spring.kafka.consumer.properties.fiz.buz=fix.fox", "spring.kafka.consumer.fetch-min-size=789", "spring.kafka.consumer.group-id=bar", "spring.kafka.consumer.heartbeat-interval=234", @@ -85,9 +86,7 @@ public void consumerProperties() { "spring.kafka.consumer.value-deserializer = org.apache.kafka.common.serialization.IntegerDeserializer"); DefaultKafkaConsumerFactory consumerFactory = this.context .getBean(DefaultKafkaConsumerFactory.class); - @SuppressWarnings("unchecked") - Map configs = (Map) new DirectFieldAccessor( - consumerFactory).getPropertyValue("configs"); + Map configs = consumerFactory.getConfigurationProperties(); // common assertThat(configs.get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)) .isEqualTo(Collections.singletonList("foo:1234")); @@ -120,17 +119,21 @@ public void consumerProperties() { assertThat(configs.get("foo")).isEqualTo("bar"); assertThat(configs.get("baz")).isEqualTo("qux"); assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); } @Test public void producerProperties() { - load("spring.kafka.clientId=cid", "spring.kafka.producer.acks=all", + load("spring.kafka.clientId=cid", + "spring.kafka.properties.foo.bar.baz=qux.fiz.buz", + "spring.kafka.producer.acks=all", "spring.kafka.producer.batch-size=20", "spring.kafka.producer.bootstrap-servers=bar:1234", // test override "spring.kafka.producer.buffer-memory=12345", "spring.kafka.producer.compression-type=gzip", "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.LongSerializer", "spring.kafka.producer.retries=2", + "spring.kafka.producer.properties.fiz.buz=fix.fox", "spring.kafka.producer.ssl.key-password=p4", "spring.kafka.producer.ssl.keystore-location=classpath:ksLocP", "spring.kafka.producer.ssl.keystore-password=p5", @@ -139,9 +142,7 @@ public void producerProperties() { "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.IntegerSerializer"); DefaultKafkaProducerFactory producerFactory = this.context .getBean(DefaultKafkaProducerFactory.class); - @SuppressWarnings("unchecked") - Map configs = (Map) new DirectFieldAccessor( - producerFactory).getPropertyValue("configs"); + Map configs = producerFactory.getConfigurationProperties(); // common assertThat(configs.get(ProducerConfig.CLIENT_ID_CONFIG)).isEqualTo("cid"); // producer @@ -166,6 +167,8 @@ public void producerProperties() { .isEqualTo(IntegerSerializer.class); assertThat(this.context.getBeansOfType(KafkaJaasLoginModuleInitializer.class)) .isEmpty(); + assertThat(configs.get("foo.bar.baz")).isEqualTo("qux.fiz.buz"); + assertThat(configs.get("fiz.buz")).isEqualTo("fix.fox"); } @SuppressWarnings("unchecked") diff --git a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc index 3496be961c74..20d2f676b13e 100644 --- a/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc +++ b/spring-boot-docs/src/main/asciidoc/appendix-application-properties.adoc @@ -968,6 +968,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.consumer.heartbeat-interval= # Expected time in milliseconds between heartbeats to the consumer coordinator. spring.kafka.consumer.key-deserializer= # Deserializer class for keys. spring.kafka.consumer.max-poll-records= # Maximum number of records returned in a single call to poll(). + spring.kafka.consumer.properties.*= # Additional properties used to configure the client. spring.kafka.consumer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.consumer.ssl.keystore-location= # Location of the key store file. spring.kafka.consumer.ssl.keystore-password= # Store password for the key store file. @@ -991,6 +992,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.producer.client-id= # Id to pass to the server when making requests; used for server-side logging. spring.kafka.producer.compression-type= # Compression type for all data generated by the producer. spring.kafka.producer.key-serializer= # Serializer class for keys. + spring.kafka.producer.properties.*= # Additional properties used to configure the client. spring.kafka.producer.retries= # When greater than zero, enables retrying of failed sends. spring.kafka.producer.ssl.key-password= # Password of the private key in the key store file. spring.kafka.producer.ssl.keystore-location= # Location of the key store file. @@ -998,7 +1000,7 @@ content into your application; rather pick only the properties that you need. spring.kafka.producer.ssl.truststore-location= # Location of the trust store file. spring.kafka.producer.ssl.truststore-password= # Store password for the trust store file. spring.kafka.producer.value-serializer= # Serializer class for values. - spring.kafka.properties.*= # Additional properties used to configure the client. + spring.kafka.properties.*= # Additional properties, common to producers and consumers, used to configure the client. spring.kafka.ssl.key-password= # Password of the private key in the key store file. spring.kafka.ssl.keystore-location= # Location of the key store file. spring.kafka.ssl.keystore-password= # Store password for the key store file. diff --git a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc index 5798b0ccd21d..6f4c31346a5b 100644 --- a/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc +++ b/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc @@ -5099,20 +5099,13 @@ are not directly supported, use the following: [source,properties,indent=0] ---- spring.kafka.properties.foo.bar=baz + spring.kafka.consumer.properties.fiz.buz=qux + spring,kafka.producer.properties.baz.qux=fiz ---- -This sets the common `foo.bar` Kafka property to `baz`. - -These properties will be shared by both the consumer and producer factory beans. -If you wish to customize these components with different properties, such as to use a -different metrics reader for each, you can override the bean definitions, as follows: - -[source,java,indent=0] ----- -include::{code-examples}/kafka/KafkaSpecialProducerConsumerConfigExample.java[tag=configuration] ----- - +This sets the common `foo.bar` Kafka property to `baz` (applies to both producers and consumers), the consumer `fiz.buz` property to `qux` and the `baz.qux` producer property to `fiz`. +IMPORTANT: Properties set in this way will override properties that are in the subset that boot explicitly supports. [[boot-features-resttemplate]] == Calling REST services with '`RestTemplate`' diff --git a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java b/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java deleted file mode 100644 index ae859a747834..000000000000 --- a/spring-boot-docs/src/main/java/org/springframework/boot/kafka/KafkaSpecialProducerConsumerConfigExample.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Copyright 2012-2017 the original author or authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.springframework.boot.kafka; - -import java.util.List; -import java.util.Map; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.common.metrics.KafkaMetric; -import org.apache.kafka.common.metrics.MetricsReporter; - -import org.springframework.boot.autoconfigure.kafka.KafkaProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.core.ConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaConsumerFactory; -import org.springframework.kafka.core.DefaultKafkaProducerFactory; -import org.springframework.kafka.core.ProducerFactory; - -/** - * Example custom kafka configuration beans used when the user wants to apply different - * common properties to the producer and consumer. - * - * @author Gary Russell - * @since 1.5 - */ -public class KafkaSpecialProducerConsumerConfigExample { - - // tag::configuration[] - @Configuration - public static class CustomKafkaBeans { - - /** - * Customized ProducerFactory bean. - * @param properties the kafka properties. - * @return the bean. - */ - @Bean - public ProducerFactory kafkaProducerFactory(KafkaProperties properties) { - Map producerProperties = properties.buildProducerProperties(); - producerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - MyProducerMetricsReporter.class); - return new DefaultKafkaProducerFactory<>(producerProperties); - } - - /** - * Customized ConsumerFactory bean. - * @param properties the kafka properties. - * @return the bean. - */ - @Bean - public ConsumerFactory kafkaConsumerFactory(KafkaProperties properties) { - Map consumerProperties = properties.buildConsumerProperties(); - consumerProperties.put(CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG, - MyConsumerMetricsReporter.class); - return new DefaultKafkaConsumerFactory<>(consumerProperties); - } - - } - // end::configuration[] - - public static class MyConsumerMetricsReporter implements MetricsReporter { - - @Override - public void configure(Map configs) { - } - - @Override - public void init(List metrics) { - } - - @Override - public void metricChange(KafkaMetric metric) { - } - - @Override - public void metricRemoval(KafkaMetric metric) { - } - - @Override - public void close() { - } - - } - - public static class MyProducerMetricsReporter implements MetricsReporter { - - @Override - public void configure(Map configs) { - } - - @Override - public void init(List metrics) { - } - - @Override - public void metricChange(KafkaMetric metric) { - } - - @Override - public void metricRemoval(KafkaMetric metric) { - } - - @Override - public void close() { - } - - } - -}