diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala index 364d1b90..b4c2da35 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala @@ -3,9 +3,9 @@ package io.github.embeddedkafka.ops import io.github.embeddedkafka.{EmbeddedK, EmbeddedKafkaConfig, EmbeddedServer} import kafka.server._ import org.apache.kafka.common.Uuid -import org.apache.kafka.common.security.auth.SecurityProtocol import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinatorConfig +import org.apache.kafka.coordinator.transaction.TransactionLogConfig import org.apache.kafka.metadata.properties.{ MetaProperties, MetaPropertiesEnsemble, @@ -17,6 +17,7 @@ import org.apache.kafka.raft.QuorumConfig import org.apache.kafka.server.ServerSocketFactory import org.apache.kafka.server.config.{ KRaftConfigs, + ReplicationConfigs, ServerConfigs, ServerLogConfigs } @@ -52,23 +53,26 @@ trait KafkaOps { // Without this the controller starts correctly on a random port but it's too late to use this port in the configs for the broker val actualControllerPort = findPortForControllerOrFail(controllerPort) - val brokerListener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort" + val brokerListener = s"BROKER://localhost:$kafkaPort" val controllerListener = s"CONTROLLER://localhost:$actualControllerPort" val configProperties = Map[String, Object]( - KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller", - KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString, - KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER", + KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller", + KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString, + ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG -> "BROKER", + KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER", QuorumConfig.QUORUM_VOTERS_CONFIG -> s"$nodeId@localhost:$actualControllerPort", ServerConfigs.BROKER_ID_CONFIG -> nodeId.toString, SocketServerConfigs.LISTENERS_CONFIG -> s"$brokerListener,$controllerListener", SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> brokerListener, - SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL", + SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "BROKER:PLAINTEXT,CONTROLLER:PLAINTEXT", ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString, ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString, + TransactionLogConfig.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString, + TransactionLogConfig.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString, // The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString ) ++ customBrokerProperties