Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.github.embeddedkafka.ops

import java.nio.file.Path

import kafka.server.{KafkaConfig, KafkaServer}
import io.github.embeddedkafka.{
EmbeddedK,
Expand All @@ -10,6 +9,15 @@ import io.github.embeddedkafka.{
EmbeddedZ
}
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.server.config.{
ServerConfigs,
ServerLogConfigs,
ZkConfigs
}
import org.apache.kafka.storage.internals.log.CleanerConfig

import scala.jdk.CollectionConverters._

Expand All @@ -31,19 +39,19 @@ trait KafkaOps {
val listener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort"

val brokerProperties = Map[String, Object](
KafkaConfig.ZkConnectProp -> zkAddress,
KafkaConfig.BrokerIdProp -> brokerId.toString,
KafkaConfig.ListenersProp -> listener,
KafkaConfig.AdvertisedListenersProp -> listener,
KafkaConfig.AutoCreateTopicsEnableProp -> autoCreateTopics.toString,
KafkaConfig.LogDirProp -> kafkaLogDir.toAbsolutePath.toString,
KafkaConfig.LogFlushIntervalMessagesProp -> 1.toString,
KafkaConfig.OffsetsTopicReplicationFactorProp -> 1.toString,
KafkaConfig.OffsetsTopicPartitionsProp -> 1.toString,
KafkaConfig.TransactionsTopicReplicationFactorProp -> 1.toString,
KafkaConfig.TransactionsTopicMinISRProp -> 1.toString,
ZkConfigs.ZK_CONNECT_CONFIG -> zkAddress,
ServerConfigs.BROKER_ID_CONFIG -> brokerId.toString,
SocketServerConfigs.LISTENERS_CONFIG -> listener,
SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> listener,
ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString,
ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString,
ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString,
GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString,
TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString,
TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString,
// The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory
KafkaConfig.LogCleanerDedupeBufferSizeProp -> logCleanerDedupeBufferSize.toString
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString
) ++ customBrokerProperties

val broker = new KafkaServer(new KafkaConfig(brokerProperties.asJava))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package io.github.embeddedkafka

import kafka.server.KafkaConfig
import io.github.embeddedkafka.EmbeddedKafka._
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.server.config.{ReplicationConfigs, ServerConfigs}

import scala.language.postfixOps
import scala.util.Random
Expand All @@ -16,8 +16,8 @@ class EmbeddedKafkaCustomConfigSpec extends EmbeddedKafkaSpecSupport {
"allow pass additional producer parameters" in {
val customBrokerConfig =
Map(
KafkaConfig.ReplicaFetchMaxBytesProp -> s"$ThreeMegabytes",
KafkaConfig.MessageMaxBytesProp -> s"$ThreeMegabytes"
ReplicationConfigs.REPLICA_FETCH_MAX_BYTES_CONFIG -> s"$ThreeMegabytes",
ServerConfigs.MESSAGE_MAX_BYTES_CONFIG -> s"$ThreeMegabytes"
)

val customProducerConfig =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package io.github.embeddedkafka

import java.util.Collections
import java.util.concurrent.TimeoutException
import kafka.server.KafkaConfig
import kafka.zk.KafkaZkClient
import io.github.embeddedkafka.EmbeddedKafka._
import io.github.embeddedkafka.serializers.{
Expand All @@ -18,6 +17,7 @@ import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.storage.internals.log.CleanerConfig
import org.apache.zookeeper.client.ZKClientConfig
import org.scalatest.concurrent.JavaFutures
import org.scalatest.time.{Milliseconds, Seconds, Span}
Expand Down Expand Up @@ -159,7 +159,7 @@ class EmbeddedKafkaMethodsSpec
"create a topic with a custom configuration" in {
implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
KafkaConfig.LogCleanerDedupeBufferSizeProp -> 2000000.toString
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> 2000000.toString
)
)
val topic = "test_custom_topic"
Expand Down Expand Up @@ -220,7 +220,7 @@ class EmbeddedKafkaMethodsSpec
"either delete of mark for deletion a list of topics" in {
implicit val config: EmbeddedKafkaConfig = EmbeddedKafkaConfig(
customBrokerProperties = Map(
KafkaConfig.LogCleanerDedupeBufferSizeProp -> 2000000.toString
CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> 2000000.toString
)
)
val topics = List("test_topic_deletion_1", "test_topic_deletion_2")
Expand Down
2 changes: 1 addition & 1 deletion project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ object Dependencies {
val Scala3 = "3.3.3"
val Scala213 = "2.13.14"
val Scala212 = "2.12.19"
val Kafka = "3.7.1"
val Kafka = "3.8.0"
val Slf4j = "1.7.36"
val ScalaTest = "3.2.19"
}
Expand Down