@@ -165,6 +165,7 @@ public class DefaultKafkaProducerFactory<K, V> extends KafkaResourceFactory
165
165
166
166
/**
167
167
* Construct a factory with the provided configuration.
168
+ *
168
169
* @param configs the configuration.
169
170
*/
170
171
public DefaultKafkaProducerFactory (Map <String , Object > configs ) {
@@ -178,11 +179,13 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs) {
178
179
* This config is going to be overridden with a suffix for target {@link Producer} instance.
179
180
* The serializers' {@code configure()} methods will be called with the
180
181
* configuration map.
182
+ *
181
183
* @param configs the configuration.
182
184
* @param keySerializer the key {@link Serializer}.
183
185
* @param valueSerializer the value {@link Serializer}.
184
186
*/
185
- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
187
+ public DefaultKafkaProducerFactory (
188
+ Map <String , Object > configs ,
186
189
@ Nullable Serializer <K > keySerializer ,
187
190
@ Nullable Serializer <V > valueSerializer ) {
188
191
@@ -196,14 +199,17 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
196
199
* be overridden with a suffix for target {@link Producer} instance. The serializers'
197
200
* {@code configure()} methods will be called with the configuration map unless
198
201
* {@code configureSerializers} is false..
202
+ *
199
203
* @param configs the configuration.
200
204
* @param keySerializer the key {@link Serializer}.
201
205
* @param valueSerializer the value {@link Serializer}.
202
206
* @param configureSerializers set to false if serializers are already fully
203
207
* configured.
208
+ *
204
209
* @since 2.8.7
205
210
*/
206
- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
211
+ public DefaultKafkaProducerFactory (
212
+ Map <String , Object > configs ,
207
213
@ Nullable Serializer <K > keySerializer ,
208
214
@ Nullable Serializer <V > valueSerializer , boolean configureSerializers ) {
209
215
@@ -217,12 +223,15 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
217
223
* be overridden with a suffix for target {@link Producer} instance. When the
218
224
* suppliers are invoked to get an instance, the serializers' {@code configure()}
219
225
* methods will be called with the configuration map.
226
+ *
220
227
* @param configs the configuration.
221
228
* @param keySerializerSupplier the key {@link Serializer} supplier function.
222
229
* @param valueSerializerSupplier the value {@link Serializer} supplier function.
230
+ *
223
231
* @since 2.3
224
232
*/
225
- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
233
+ public DefaultKafkaProducerFactory (
234
+ Map <String , Object > configs ,
226
235
@ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
227
236
@ Nullable Supplier <Serializer <V >> valueSerializerSupplier ) {
228
237
@@ -244,7 +253,8 @@ public DefaultKafkaProducerFactory(Map<String, Object> configs,
244
253
* configured.
245
254
* @since 2.8.7
246
255
*/
247
- public DefaultKafkaProducerFactory (Map <String , Object > configs ,
256
+ public DefaultKafkaProducerFactory (
257
+ Map <String , Object > configs ,
248
258
@ Nullable Supplier <Serializer <K >> keySerializerSupplier ,
249
259
@ Nullable Supplier <Serializer <V >> valueSerializerSupplier , boolean configureSerializers ) {
250
260
@@ -308,7 +318,9 @@ public void setBeanName(String name) {
308
318
* Set a key serializer. The serializer will be configured using the producer
309
319
* configuration, unless {@link #setConfigureSerializers(boolean)
310
320
* configureSerializers} is false.
321
+ *
311
322
* @param keySerializer the key serializer.
323
+ *
312
324
* @see #setConfigureSerializers(boolean)
313
325
*/
314
326
public void setKeySerializer (@ Nullable Serializer <K > keySerializer ) {
@@ -319,7 +331,9 @@ public void setKeySerializer(@Nullable Serializer<K> keySerializer) {
319
331
* Set a value serializer. The serializer will be configured using the producer
320
332
* configuration, unless {@link #setConfigureSerializers(boolean)
321
333
* configureSerializers} is false.
334
+ *
322
335
* @param valueSerializer the value serializer.
336
+ *
323
337
* @see #setConfigureSerializers(boolean)
324
338
*/
325
339
public void setValueSerializer (@ Nullable Serializer <V > valueSerializer ) {
@@ -330,6 +344,7 @@ public void setValueSerializer(@Nullable Serializer<V> valueSerializer) {
330
344
* Set a supplier to supply instances of the key serializer. The serializer will be
331
345
* configured using the producer configuration, unless
332
346
* {@link #setConfigureSerializers(boolean) configureSerializers} is false.
347
+ *
333
348
* @param keySerializerSupplier the supplier.
334
349
* @since 2.8
335
350
* @see #setConfigureSerializers(boolean)
@@ -340,6 +355,7 @@ public void setKeySerializerSupplier(Supplier<Serializer<K>> keySerializerSuppli
340
355
341
356
/**
342
357
* Set a supplier to supply instances of the value serializer.
358
+ *
343
359
* @param valueSerializerSupplier the supplier. The serializer will be configured
344
360
* using the producer configuration, unless {@link #setConfigureSerializers(boolean)
345
361
* configureSerializers} is false.
@@ -354,12 +370,14 @@ public void setValueSerializerSupplier(Supplier<Serializer<V>> valueSerializerSu
354
370
* If true (default), programmatically provided serializers (via constructor or
355
371
* setters) will be configured using the producer configuration. Set to false if the
356
372
* serializers are already fully configured.
373
+ *
357
374
* @return true to configure.
358
375
* @since 2.8.7
359
376
* @see #setKeySerializer(Serializer)
360
377
* @see #setKeySerializerSupplier(Supplier)
361
378
* @see #setValueSerializer(Serializer)
362
379
* @see #setValueSerializerSupplier(Supplier)
380
+
363
381
*/
364
382
public boolean isConfigureSerializers () {
365
383
return this .configureSerializers ;
@@ -369,6 +387,7 @@ public boolean isConfigureSerializers() {
369
387
* Set to false (default true) to prevent programmatically provided serializers (via
370
388
* constructor or setters) from being configured using the producer configuration,
371
389
* e.g. if the serializers are already fully configured.
390
+ *
372
391
* @param configureSerializers false to not configure.
373
392
* @since 2.8.7
374
393
* @see #setKeySerializer(Serializer)
@@ -385,6 +404,7 @@ public void setConfigureSerializers(boolean configureSerializers) {
385
404
* closing the producer itself (when {@link #reset()}, {@link #destroy()
386
405
* #closeProducerFor(String)}, or {@link #closeThreadBoundProducer()} are invoked).
387
406
* Specified in seconds; default {@link #DEFAULT_PHYSICAL_CLOSE_TIMEOUT}.
407
+
388
408
* @param physicalCloseTimeout the timeout in seconds.
389
409
* @since 1.0.7
390
410
*/
@@ -425,6 +445,7 @@ public final void setTransactionIdPrefix(String transactionIdPrefix) {
425
445
* all clients. Clients <b>must</b> call {@link #closeThreadBoundProducer()} to
426
446
* physically close the producer when it is no longer needed. These producers will not
427
447
* be closed by {@link #destroy()} or {@link #reset()}.
448
+ *
428
449
* @param producerPerThread true for a producer per thread.
429
450
* @since 2.3
430
451
* @see #closeThreadBoundProducer()
@@ -526,19 +547,23 @@ public int getPhase() {
526
547
* you want to change the ID config, add a new
527
548
* {@link org.apache.kafka.clients.producer.ProducerConfig#TRANSACTIONAL_ID_CONFIG}
528
549
* key to the override config.</p>
550
+ *
529
551
* @param overrideProperties the properties to be applied to the new factory
552
+ *
530
553
* @return {@link org.springframework.kafka.core.DefaultKafkaProducerFactory} with
531
- * properties applied
554
+ * properties applied
532
555
*/
533
556
@ Override
534
557
public ProducerFactory <K , V > copyWithConfigurationOverride (Map <String , Object > overrideProperties ) {
535
558
Map <String , Object > producerProperties = new HashMap <>(getConfigurationProperties ());
536
559
producerProperties .putAll (overrideProperties );
537
560
producerProperties = ensureExistingTransactionIdPrefixInProperties (producerProperties );
538
- DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(producerProperties ,
539
- getKeySerializerSupplier (),
540
- getValueSerializerSupplier (),
541
- isConfigureSerializers ());
561
+ DefaultKafkaProducerFactory <K , V > newFactory = new DefaultKafkaProducerFactory <>(
562
+ producerProperties ,
563
+ getKeySerializerSupplier (),
564
+ getValueSerializerSupplier (),
565
+ isConfigureSerializers ()
566
+ );
542
567
newFactory .setPhysicalCloseTimeout ((int ) getPhysicalCloseTimeout ().getSeconds ());
543
568
newFactory .setProducerPerThread (isProducerPerThread ());
544
569
for (ProducerPostProcessor <K , V > templatePostProcessor : getPostProcessors ()) {
@@ -559,7 +584,9 @@ public ProducerFactory<K, V> copyWithConfigurationOverride(Map<String, Object> o
559
584
* new factory, the transactionId has to be reinserted prior use.
560
585
* The incoming properties are checked for a transactionId key. If none is
561
586
* there, the one existing in the factory is added.
587
+ *
562
588
* @param producerProperties the properties to be used for the new factory
589
+ *
563
590
* @return the producerProperties or a copy with the transaction ID set
564
591
*/
565
592
private Map <String , Object > ensureExistingTransactionIdPrefixInProperties (Map <String , Object > producerProperties ) {
@@ -576,7 +603,9 @@ private Map<String, Object> ensureExistingTransactionIdPrefixInProperties(Map<St
576
603
577
604
/**
578
605
* Add a listener.
606
+ *
579
607
* @param listener the listener.
608
+ *
580
609
* @since 2.5
581
610
*/
582
611
@ Override
@@ -587,8 +616,10 @@ public void addListener(Listener<K, V> listener) {
587
616
588
617
/**
589
618
* Add a listener at a specific index.
619
+ *
590
620
* @param index the index (list position).
591
621
* @param listener the listener.
622
+ *
592
623
* @since 2.5
593
624
*/
594
625
@ Override
@@ -604,8 +635,11 @@ public void addListener(int index, Listener<K, V> listener) {
604
635
605
636
/**
606
637
* Remove a listener.
638
+ *
607
639
* @param listener the listener.
640
+ *
608
641
* @return true if removed.
642
+ *
609
643
* @since 2.5
610
644
*/
611
645
@ Override
@@ -633,7 +667,7 @@ public void updateConfigs(Map<String, Object> updates) {
633
667
Assert .isTrue (this .transactionIdPrefix != null
634
668
? entry .getValue () != null
635
669
: entry .getValue () == null ,
636
- "Cannot change transactional capability" );
670
+ "Cannot change transactional capability" );
637
671
this .transactionIdPrefix = (String ) entry .getValue ();
638
672
}
639
673
else if (entry .getKey ().equals (ProducerConfig .CLIENT_ID_CONFIG )) {
@@ -723,6 +757,7 @@ public void onApplicationEvent(ContextStoppedEvent event) {
723
757
/**
724
758
* Close the {@link Producer}(s) and clear the cache of transactional
725
759
* {@link Producer}(s).
760
+ *
726
761
* @since 2.2
727
762
*/
728
763
@ Override
@@ -769,7 +804,8 @@ private Producer<K, V> doCreateProducer(@Nullable String txIdPrefix) {
769
804
}
770
805
if (this .producer == null ) {
771
806
this .producer = new CloseSafeProducer <>(createKafkaProducer (), this ::removeProducer ,
772
- this .physicalCloseTimeout , this .beanName , this .epoch .get ());
807
+ this .physicalCloseTimeout , this .beanName , this .epoch .get ()
808
+ );
773
809
this .listeners .forEach (listener -> listener .producerAdded (this .producer .clientId , this .producer ));
774
810
}
775
811
return this .producer ;
@@ -787,7 +823,8 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
787
823
}
788
824
if (tlProducer == null ) {
789
825
tlProducer = new CloseSafeProducer <>(createKafkaProducer (), this ::removeProducer ,
790
- this .physicalCloseTimeout , this .beanName , this .epoch .get ());
826
+ this .physicalCloseTimeout , this .beanName , this .epoch .get ()
827
+ );
791
828
for (Listener <K , V > listener : this .listeners ) {
792
829
listener .producerAdded (tlProducer .clientId , tlProducer );
793
830
}
@@ -798,6 +835,7 @@ private Producer<K, V> getOrCreateThreadBoundProducer() {
798
835
799
836
/**
800
837
* Subclasses must return a raw producer which will be wrapped in a {@link CloseSafeProducer}.
838
+ *
801
839
* @return the producer.
802
840
*/
803
841
protected Producer <K , V > createKafkaProducer () {
@@ -806,9 +844,12 @@ protected Producer<K, V> createKafkaProducer() {
806
844
807
845
/**
808
846
* Remove the single shared producer and a thread-bound instance if present.
847
+ *
809
848
* @param producerToRemove the producer.
810
849
* @param timeout the close timeout.
850
+ *
811
851
* @return true if the producer was closed.
852
+ *
812
853
* @since 2.2.13
813
854
*/
814
855
protected final boolean removeProducer (CloseSafeProducer <K , V > producerToRemove , Duration timeout ) {
@@ -818,7 +859,9 @@ protected final boolean removeProducer(CloseSafeProducer<K, V> producerToRemove,
818
859
/**
819
860
* Subclasses must return a producer from the {@link #getCache()} or a
820
861
* new raw producer wrapped in a {@link CloseSafeProducer}.
862
+ *
821
863
* @return the producer - cannot be null.
864
+ *
822
865
* @since 1.3
823
866
*/
824
867
protected Producer <K , V > createTransactionalProducer () {
@@ -838,7 +881,11 @@ protected Producer<K, V> createTransactionalProducer(String txIdPrefix) {
838
881
}
839
882
}
840
883
if (cachedProducer == null ) {
841
- return doCreateTxProducer (txIdPrefix , "" + this .transactionIdSuffix .getAndIncrement (), this ::cacheReturner );
884
+ return doCreateTxProducer (
885
+ txIdPrefix ,
886
+ "" + this .transactionIdSuffix .getAndIncrement (),
887
+ this ::cacheReturner
888
+ );
842
889
}
843
890
else {
844
891
return cachedProducer ;
@@ -876,7 +923,8 @@ boolean cacheReturner(CloseSafeProducer<K, V> producerToRemove, Duration timeout
876
923
}
877
924
}
878
925
879
- private CloseSafeProducer <K , V > doCreateTxProducer (String prefix , String suffix ,
926
+ private CloseSafeProducer <K , V > doCreateTxProducer (
927
+ String prefix , String suffix ,
880
928
BiPredicate <CloseSafeProducer <K , V >, Duration > remover ) {
881
929
Producer <K , V > newProducer = createRawProducer (getTxProducerConfigs (prefix + suffix ));
882
930
try {
@@ -895,7 +943,8 @@ private CloseSafeProducer<K, V> doCreateTxProducer(String prefix, String suffix,
895
943
}
896
944
CloseSafeProducer <K , V > closeSafeProducer =
897
945
new CloseSafeProducer <>(newProducer , remover , prefix , this .physicalCloseTimeout , this .beanName ,
898
- this .epoch .get ());
946
+ this .epoch .get ()
947
+ );
899
948
this .listeners .forEach (listener -> listener .producerAdded (closeSafeProducer .clientId , closeSafeProducer ));
900
949
return closeSafeProducer ;
901
950
}
@@ -939,6 +988,7 @@ public void closeThreadBoundProducer() {
939
988
940
989
/**
941
990
* Return the configuration of a producer.
991
+ *
942
992
* @return the configuration of a producer.
943
993
* @since 2.8.3
944
994
* @see #createKafkaProducer()
@@ -947,15 +997,19 @@ protected Map<String, Object> getProducerConfigs() {
947
997
final Map <String , Object > newProducerConfigs = new HashMap <>(this .configs );
948
998
checkBootstrap (newProducerConfigs );
949
999
if (this .clientIdPrefix != null ) {
950
- newProducerConfigs .put (ProducerConfig .CLIENT_ID_CONFIG ,
951
- this .clientIdPrefix + "-" + this .clientIdCounter .incrementAndGet ());
1000
+ newProducerConfigs .put (
1001
+ ProducerConfig .CLIENT_ID_CONFIG ,
1002
+ this .clientIdPrefix + "-" + this .clientIdCounter .incrementAndGet ()
1003
+ );
952
1004
}
953
1005
return newProducerConfigs ;
954
1006
}
955
1007
956
1008
/**
957
1009
* Return the configuration of a transactional producer.
1010
+ *
958
1011
* @param transactionId the transactionId.
1012
+ *
959
1013
* @return the configuration of a transactional producer.
960
1014
* @since 2.8.3
961
1015
* @see #doCreateTxProducer(String, String, BiPredicate)
@@ -971,7 +1025,6 @@ protected Map<String, Object> getTxProducerConfigs(String transactionId) {
971
1025
*
972
1026
* @param <K> the key type.
973
1027
* @param <V> the value type.
974
- *
975
1028
*/
976
1029
protected static class CloseSafeProducer <K , V > implements Producer <K , V > {
977
1030
@@ -995,14 +1048,16 @@ protected static class CloseSafeProducer<K, V> implements Producer<K, V> {
995
1048
996
1049
volatile boolean closed ; // NOSONAR
997
1050
998
- CloseSafeProducer (Producer <K , V > delegate ,
1051
+ CloseSafeProducer (
1052
+ Producer <K , V > delegate ,
999
1053
BiPredicate <CloseSafeProducer <K , V >, Duration > removeConsumerProducer , Duration closeTimeout ,
1000
1054
String factoryName , int epoch ) {
1001
1055
1002
1056
this (delegate , removeConsumerProducer , null , closeTimeout , factoryName , epoch );
1003
1057
}
1004
1058
1005
- CloseSafeProducer (Producer <K , V > delegate ,
1059
+ CloseSafeProducer (
1060
+ Producer <K , V > delegate ,
1006
1061
BiPredicate <CloseSafeProducer <K , V >, Duration > removeProducer , @ Nullable String txIdPrefix ,
1007
1062
Duration closeTimeout , String factoryName , int epoch ) {
1008
1063
@@ -1097,7 +1152,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
1097
1152
}
1098
1153
1099
1154
@ Override
1100
- public void sendOffsetsToTransaction (Map <TopicPartition , OffsetAndMetadata > offsets ,
1155
+ public void sendOffsetsToTransaction (
1156
+ Map <TopicPartition , OffsetAndMetadata > offsets ,
1101
1157
ConsumerGroupMetadata groupMetadata ) throws ProducerFencedException {
1102
1158
1103
1159
LOGGER .trace (() -> toString () + " sendOffsetsToTransaction(" + offsets + ", " + groupMetadata + ")" );
0 commit comments