@@ -3,9 +3,9 @@ package io.github.embeddedkafka.ops
33import io .github .embeddedkafka .{EmbeddedK , EmbeddedKafkaConfig , EmbeddedServer }
44import kafka .server ._
55import org .apache .kafka .common .Uuid
6- import org .apache .kafka .common .security .auth .SecurityProtocol
76import org .apache .kafka .common .utils .Time
87import org .apache .kafka .coordinator .group .GroupCoordinatorConfig
8+ import org .apache .kafka .coordinator .transaction .TransactionLogConfig
99import org .apache .kafka .metadata .properties .{
1010 MetaProperties ,
1111 MetaPropertiesEnsemble ,
@@ -17,6 +17,7 @@ import org.apache.kafka.raft.QuorumConfig
1717import org .apache .kafka .server .ServerSocketFactory
1818import org .apache .kafka .server .config .{
1919 KRaftConfigs ,
20+ ReplicationConfigs ,
2021 ServerConfigs ,
2122 ServerLogConfigs
2223}
@@ -52,23 +53,26 @@ trait KafkaOps {
5253 // Without this the controller starts correctly on a random port but it's too late to use this port in the configs for the broker
5354 val actualControllerPort = findPortForControllerOrFail(controllerPort)
5455
55- val brokerListener = s " ${ SecurityProtocol . PLAINTEXT } ://localhost: $kafkaPort"
56+ val brokerListener = s " BROKER ://localhost: $kafkaPort"
5657 val controllerListener = s " CONTROLLER://localhost: $actualControllerPort"
5758
5859 val configProperties = Map [String , Object ](
59- KRaftConfigs .PROCESS_ROLES_CONFIG -> " broker,controller" ,
60- KRaftConfigs .NODE_ID_CONFIG -> nodeId.toString,
61- KRaftConfigs .CONTROLLER_LISTENER_NAMES_CONFIG -> " CONTROLLER" ,
60+ KRaftConfigs .PROCESS_ROLES_CONFIG -> " broker,controller" ,
61+ KRaftConfigs .NODE_ID_CONFIG -> nodeId.toString,
62+ ReplicationConfigs .INTER_BROKER_LISTENER_NAME_CONFIG -> " BROKER" ,
63+ KRaftConfigs .CONTROLLER_LISTENER_NAMES_CONFIG -> " CONTROLLER" ,
6264 QuorumConfig .QUORUM_VOTERS_CONFIG -> s " $nodeId@localhost: $actualControllerPort" ,
6365 ServerConfigs .BROKER_ID_CONFIG -> nodeId.toString,
6466 SocketServerConfigs .LISTENERS_CONFIG -> s " $brokerListener, $controllerListener" ,
6567 SocketServerConfigs .ADVERTISED_LISTENERS_CONFIG -> brokerListener,
66- SocketServerConfigs .LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> " CONTROLLER :PLAINTEXT,PLAINTEXT :PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL " ,
68+ SocketServerConfigs .LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> " BROKER :PLAINTEXT,CONTROLLER :PLAINTEXT" ,
6769 ServerLogConfigs .AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString,
6870 ServerLogConfigs .LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString,
6971 ServerLogConfigs .LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1 .toString,
7072 GroupCoordinatorConfig .OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1 .toString,
7173 GroupCoordinatorConfig .OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1 .toString,
74+ TransactionLogConfig .TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1 .toString,
75+ TransactionLogConfig .TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1 .toString,
7276 // The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
7377 CleanerConfig .LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString
7478 ) ++ customBrokerProperties
0 commit comments