diff --git a/.github/workflows/coverage-report.yml b/.github/workflows/coverage-report.yml index 20fa9b10..bacd41ea 100644 --- a/.github/workflows/coverage-report.yml +++ b/.github/workflows/coverage-report.yml @@ -13,7 +13,7 @@ jobs: - uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: 8 + java-version: 17 cache: 'sbt' - uses: sbt/setup-sbt@v1 - name: Compile diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 9ed00a60..29849603 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -14,7 +14,7 @@ jobs: - uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: 8 + java-version: 17 cache: 'sbt' - uses: sbt/setup-sbt@v1 - name: Publish artifacts diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5e480c65..639f5929 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,7 +13,7 @@ jobs: - uses: actions/setup-java@v4 with: distribution: 'temurin' - java-version: 8 + java-version: 17 cache: 'sbt' - uses: sbt/setup-sbt@v1 - name: Check formatting diff --git a/README.md b/README.md index 5528aa1e..2d2abb8e 100644 --- a/README.md +++ b/README.md @@ -11,25 +11,57 @@ A library that provides an in-memory Kafka instance to run your tests against. Inspired by [kafka-unit](https://github.com/chbatey/kafka-unit). +1. [Version compatibility matrix](#version-compatibility-matrix) +2. [Upgrade notes](#upgrade-notes) +3. [Usage](#usage) + 1. [embedded-kafka](#embedded-kafka-1) + 2. [embedded-kafka-streams](#embedded-kafka-streams) + 3. [embedded-kafka-connect](#embedded-kafka-connect) + +--- + ## Version compatibility matrix -embedded-kafka is available on Maven Central, compiled for Scala 2.12, 2.13 and Scala 3 (since v3.4.0.1). +The library is available on Maven Central. Versions match the version of Kafka they're built against. -## Important known limitation (prior to v2.8.0) +| embedded-kafka version | Kafka version | Scala versions | Java version | +|------------------------|---------------|-----------------|--------------| +| 4.0.0 | 4.0.0 | 2.13, 3.3 | 17+ | +| 3.4.0.1 - 3.9.0 | 3.4.0 - 3.9.0 | 2,12, 2.13, 3.3 | 8+ | + +_Note that [prior to v2.8.0](https://github.com/apache/kafka/pull/10174) Kafka core was inlining the Scala library, so you couldn't use a different Scala **patch** version than [what Kafka used to compile its jars](https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L30)._ -[Prior to v2.8.0](https://github.com/apache/kafka/pull/10174) Kafka core was inlining the Scala library, so you couldn't use a different Scala **patch** version than [what Kafka used to compile its jars](https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L30)! +--- -## Breaking change: new package name +## Upgrade notes -From v2.8.0 onwards package name has been updated to reflect the library group id (i.e. `io.github.embeddedkafka`). +### 4.0.0 +Major changes: +- **Java 17+:** as [Kafka Server 4.x requires Java 17+](https://kafka.apache.org/40/documentation/compatibility.html), so does embedded-kafka even though Kafka Clients/Streams are still available for Java 11+. +- **Scala 2.13+**: Kafka is not compiled against Scala 2.12 anymore, so does embedded-kafka. +- embedded-kafka 4.0.0 starts a Kafka server in combined mode (broker and controller) and no more Zookeeper. + +As a user, you'll have to change your code to use `controllerPort` instead of `zookeeperPort` in places you were doing so: +```diff +- EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 54321) ++ EmbeddedKafkaConfig(kafkaPort = 12345, controllerPort = 54321) +``` + +### 2.8.0 + +**Package name change:** from v2.8.0 onwards package name has been updated to reflect the library group id (i.e. `io.github.embeddedkafka`). Aliases to the old package name have been added, along with a one-time [Scalafix rule](https://github.com/embeddedkafka/embedded-kafka-scalafix) to ensure the smoothest migration. -## embedded-kafka +--- + +## Usage + +### embedded-kafka -### How to use +#### How to use * In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka" % "x.x.x" % Test` * Have your class extend the `EmbeddedKafka` trait. @@ -52,11 +84,11 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka { } ``` -* In-memory Zookeeper and Kafka will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test. +* In-memory Kafka broker and controller (combined mode) will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test. -### Use without the `withRunningKafka` method +#### Use without the `withRunningKafka` method -A `EmbeddedKafka` companion object is provided for usage without extending the `EmbeddedKafka` trait. Zookeeper and Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka and Zookeeper on every test. +A `EmbeddedKafka` companion object is provided for usage without extending the `EmbeddedKafka` trait. Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka on every test. ```scala class MySpec extends AnyWordSpecLike with Matchers { @@ -76,9 +108,9 @@ class MySpec extends AnyWordSpecLike with Matchers { Please note that in order to avoid Kafka instances not shutting down properly, it's recommended to call `EmbeddedKafka.stop()` in a `after` block or in a similar teardown logic. -### Configuration +#### Configuration -It's possible to change the ports on which Zookeeper and Kafka are started by providing an implicit `EmbeddedKafkaConfig` +It's possible to change the ports on which Kafka broker and controller are started by providing an implicit `EmbeddedKafkaConfig` ```scala class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka { @@ -96,7 +128,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka { } ``` -If you want to run ZooKeeper and Kafka on arbitrary available ports, you can +If you want to run Kafka broker and controller on arbitrary available ports, you can use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more reliable, especially when running tests in parallel or on machines where other tests or services may be running with port numbers you can't control. @@ -107,7 +139,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka { "runs with embedded kafka on arbitrary available ports" should { "work" in { - val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0) withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => // now a kafka broker is listening on actualConfig.kafkaPort @@ -153,7 +185,7 @@ Those properties will be added to the broker configuration, be careful some prop in case of conflict the `customBrokerProperties` values will take precedence. Please look at the source code to see what these properties are. -### Utility methods +#### Utility methods The `EmbeddedKafka` trait provides also some utility methods to interact with the embedded kafka, in order to set preconditions or verifications in your specs: @@ -165,17 +197,17 @@ def consumeFirstMessageFrom(topic: String): String def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit ``` -### Custom producers +#### Custom producers Given implicits `Deserializer`s for each type and an `EmbeddedKafkaConfig` it is possible to use `withProducer[A, B, R] { your code here }` where R is the code return type. For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project. -### Custom consumers +#### Custom consumers Given implicits `Serializer`s for each type and an `EmbeddedKafkaConfig` it is possible to use `withConsumer[A, B, R] { your code here }` where R is the code return type. -### Loan methods example +#### Loan methods example A simple test using loan methods can be as simple as this: @@ -201,26 +233,26 @@ A simple test using loan methods can be as simple as this: }) ``` -## embedded-kafka-streams +### embedded-kafka-streams A library that builds on top of `embedded-kafka` to offer easy testing of [Kafka Streams](https://kafka.apache.org/documentation/streams). It takes care of instantiating and starting your streams as well as closing them after running your test-case code. -### How to use +#### How to use * In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka-streams" % "x.x.x" % Test` * Have a look at the [example test](kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala) * For most of the cases have your class extend the `EmbeddedKafkaStreams` trait. This offers both streams management and easy loaning of producers and consumers for asserting resulting messages in output/sink topics. * Use `EmbeddedKafkaStreams.runStreams` and `EmbeddedKafka.withConsumer` and `EmbeddedKafka.withProducer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala). -## embedded-kafka-connect +### embedded-kafka-connect A library that builds on top of `embedded-kafka` to offer easy testing of [Kafka Connect](https://kafka.apache.org/documentation/#connect). It takes care of instantiating and starting a Kafka Connect server as well as closing it after running your test-case code. -### How to use +#### How to use * In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka-connect" % "x.x.x" % Test` * Have a look at the [example test](kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala) diff --git a/build.sbt b/build.sbt index 669b5cd0..e8dd9452 100644 --- a/build.sbt +++ b/build.sbt @@ -75,7 +75,6 @@ lazy val commonSettings = Seq( organization := "io.github.embeddedkafka", scalaVersion := Versions.Scala213, crossScalaVersions := Seq( - Versions.Scala212, Versions.Scala213, Versions.Scala3 ) @@ -92,6 +91,7 @@ lazy val embeddedKafka = (project in file("embedded-kafka")) .settings(name := "embedded-kafka") .settings(commonSettings: _*) .settings(libraryDependencies ++= EmbeddedKafka.prodDeps) + .settings(libraryDependencies ++= EmbeddedKafka.testDeps) lazy val kafkaStreams = (project in file("kafka-streams")) .settings(name := "embedded-kafka-streams") diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafka.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafka.scala index d6c11d12..d2b0da51 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafka.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafka.scala @@ -21,20 +21,19 @@ trait EmbeddedKafka override private[embeddedkafka] def withRunningServers[T]( config: EmbeddedKafkaConfig, - actualZkPort: Int, kafkaLogsDir: Path )(body: EmbeddedKafkaConfig => T): T = { - val broker = + val (broker, controller) = startKafka( config.kafkaPort, - actualZkPort, + config.controllerPort, config.customBrokerProperties, kafkaLogsDir ) val configWithUsedPorts = EmbeddedKafkaConfig( EmbeddedKafka.kafkaPort(broker), - actualZkPort, + EmbeddedKafka.controllerPort(controller), config.customBrokerProperties, config.customProducerProperties, config.customConsumerProperties @@ -43,8 +42,13 @@ trait EmbeddedKafka try { body(configWithUsedPorts) } finally { + // In combined mode, we want to shut down the broker first, since the controller may be + // needed for controlled shutdown. Additionally, the controller shutdown process currently + // stops the raft client early on, which would disrupt broker shutdown. broker.shutdown() + controller.shutdown() broker.awaitShutdown() + controller.awaitShutdown() } } } @@ -53,21 +57,8 @@ object EmbeddedKafka extends EmbeddedKafka with RunningEmbeddedKafkaOps[EmbeddedKafkaConfig, EmbeddedK] { override def start()(implicit config: EmbeddedKafkaConfig): EmbeddedK = { - val zkLogsDir = Files.createTempDirectory("zookeeper-logs") val kafkaLogsDir = Files.createTempDirectory("kafka-logs") - - val factory = - EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir) - - val configWithUsedPorts = EmbeddedKafkaConfig( - config.kafkaPort, - zookeeperPort(factory), - config.customBrokerProperties, - config.customProducerProperties, - config.customConsumerProperties - ) - - startKafka(kafkaLogsDir, Option(factory))(configWithUsedPorts) + startKafka(kafkaLogsDir)(config) } override def isRunning: Boolean = @@ -77,7 +68,7 @@ object EmbeddedKafka } private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] { - this: ZooKeeperOps with KafkaOps => + this: KafkaOps => /** * Starts a Kafka broker (and performs additional logic, if any), then @@ -85,8 +76,6 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] { * * @param config * the user-defined [[EmbeddedKafkaConfig]] - * @param actualZkPort - * the actual ZooKeeper port * @param kafkaLogsDir * the path for the Kafka logs * @param body @@ -94,13 +83,12 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] { */ private[embeddedkafka] def withRunningServers[T]( config: C, - actualZkPort: Int, kafkaLogsDir: Path )(body: C => T): T /** - * Starts a ZooKeeper instance and a Kafka broker (and performs additional - * logic, if any), then executes the body passed as a parameter. + * Starts a Kafka broker and controller (and performs additional logic, if + * any), then executes the body passed as a parameter. * * @param body * the function to execute @@ -108,19 +96,17 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] { * an implicit [[EmbeddedKafkaConfig]] */ def withRunningKafka[T](body: => T)(implicit config: C): T = { - withRunningZooKeeper(config.zooKeeperPort) { actualZkPort => - withTempDir("kafka") { kafkaLogsDir => - withRunningServers(config, actualZkPort, kafkaLogsDir)(_ => body) - } + withTempDir("kafka") { kafkaLogsDir => + withRunningServers(config, kafkaLogsDir)(_ => body) } } /** - * Starts a ZooKeeper instance and a Kafka broker (and performs additional - * logic, if any), then executes the body passed as a parameter. The actual - * ports of the servers will be detected and inserted into a copied version - * of the [[EmbeddedKafkaConfig]] that gets passed to body. This is useful if - * you set any port to `0`, which will listen on an arbitrary available port. + * Starts a Kafka broker and controller (and performs additional logic, if + * any), then executes the body passed as a parameter. The actual ports of + * the servers will be detected and inserted into a copied version of the + * [[EmbeddedKafkaConfig]] that gets passed to body. This is useful if you + * set any port to `0`, which will listen on an arbitrary available port. * * @param config * the user-defined [[EmbeddedKafkaConfig]] @@ -129,23 +115,8 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] { * actual ports the servers are running on */ def withRunningKafkaOnFoundPort[T](config: C)(body: C => T): T = { - withRunningZooKeeper(config.zooKeeperPort) { actualZkPort => - withTempDir("kafka") { kafkaLogsDir => - withRunningServers(config, actualZkPort, kafkaLogsDir)(body) - } - } - } - - private[embeddedkafka] def withRunningZooKeeper[T]( - port: Int - )(body: Int => T): T = { - withTempDir("zookeeper-logs") { zkLogsDir => - val factory = startZooKeeper(port, zkLogsDir) - try { - body(factory.getLocalPort) - } finally { - factory.shutdown() - } + withTempDir("kafka") { kafkaLogsDir => + withRunningServers(config, kafkaLogsDir)(body) } } diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafkaConfig.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafkaConfig.scala index 6b44724c..d6df50c6 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafkaConfig.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedKafkaConfig.scala @@ -2,7 +2,7 @@ package io.github.embeddedkafka trait EmbeddedKafkaConfig { def kafkaPort: Int - def zooKeeperPort: Int + def controllerPort: Int def customBrokerProperties: Map[String, String] def customProducerProperties: Map[String, String] def customConsumerProperties: Map[String, String] @@ -11,7 +11,7 @@ trait EmbeddedKafkaConfig { case class EmbeddedKafkaConfigImpl( kafkaPort: Int, - zooKeeperPort: Int, + controllerPort: Int, customBrokerProperties: Map[String, String], customProducerProperties: Map[String, String], customConsumerProperties: Map[String, String] @@ -20,21 +20,21 @@ case class EmbeddedKafkaConfigImpl( } object EmbeddedKafkaConfig { - lazy val defaultKafkaPort = 6001 - lazy val defaultZookeeperPort = 6000 + lazy val defaultKafkaPort = 6001 + lazy val defaultControllerPort = 6002 implicit val defaultConfig: EmbeddedKafkaConfig = apply() def apply( kafkaPort: Int = defaultKafkaPort, - zooKeeperPort: Int = defaultZookeeperPort, + controllerPort: Int = defaultControllerPort, customBrokerProperties: Map[String, String] = Map.empty, customProducerProperties: Map[String, String] = Map.empty, customConsumerProperties: Map[String, String] = Map.empty ): EmbeddedKafkaConfig = EmbeddedKafkaConfigImpl( kafkaPort, - zooKeeperPort, + controllerPort, customBrokerProperties, customProducerProperties, customConsumerProperties diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedServer.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedServer.scala index 0258bf1b..4c303e04 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedServer.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/EmbeddedServer.scala @@ -1,10 +1,8 @@ package io.github.embeddedkafka -import java.nio.file.Path - -import kafka.server.KafkaServer -import org.apache.zookeeper.server.ServerCnxnFactory +import kafka.server.{BrokerServer, ControllerServer} +import java.nio.file.Path import scala.reflect.io.Directory /** @@ -14,82 +12,49 @@ private[embeddedkafka] trait EmbeddedServer { def stop(clearLogs: Boolean): Unit } -/** - * An instance of an embedded Zookeeper server. - * - * @param factory - * the server. - * @param logsDirs - * the directory logs are to be written to. - */ -case class EmbeddedZ( - factory: ServerCnxnFactory, - logsDirs: Path -) extends EmbeddedServer { - - /** - * Shuts down the factory and then optionally deletes the log directory. - * - * @param clearLogs - * pass `true` to recursively delete the log directory. - */ - override def stop(clearLogs: Boolean): Unit = { - factory.shutdown() - if (clearLogs) { - val _ = Directory(logsDirs.toFile).deleteRecursively() - } - } -} - private[embeddedkafka] trait EmbeddedServerWithKafka extends EmbeddedServer { - def factory: Option[EmbeddedZ] - def broker: KafkaServer + def broker: BrokerServer + def controller: ControllerServer def logsDirs: Path } /** * An instance of an embedded Kafka server. * - * @param factory - * the optional [[EmbeddedZ]] server which Kafka relies upon. * @param broker - * the Kafka server. + * the Kafka broker server. + * @param controller + * the Kafka controller server. * @param logsDirs * the directory logs are to be written to. * @param config * the [[EmbeddedKafkaConfig]] used to start the broker. */ case class EmbeddedK( - factory: Option[EmbeddedZ], - broker: KafkaServer, + broker: BrokerServer, + controller: ControllerServer, logsDirs: Path, config: EmbeddedKafkaConfig ) extends EmbeddedServerWithKafka { /** - * Shuts down the broker, the factory it relies upon, if defined, and the - * app, if defined. Optionally deletes the log directory. + * Shuts down the broker and controller, and the app, if defined. Optionally + * deletes the log directory. * * @param clearLogs * pass `true` to recursively delete the log directory. */ override def stop(clearLogs: Boolean): Unit = { + // In combined mode, we want to shut down the broker first, since the controller may be + // needed for controlled shutdown. Additionally, the controller shutdown process currently + // stops the raft client early on, which would disrupt broker shutdown. broker.shutdown() + controller.shutdown() broker.awaitShutdown() - - factory.foreach(_.stop(clearLogs)) + controller.awaitShutdown() if (clearLogs) { val _ = Directory(logsDirs.toFile).deleteRecursively() } } } - -object EmbeddedK { - def apply( - broker: KafkaServer, - logsDirs: Path, - config: EmbeddedKafkaConfig - ): EmbeddedK = - EmbeddedK(factory = None, broker, logsDirs, config) -} diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/AdminOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/AdminOps.scala index 9cfaef45..910745ca 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/AdminOps.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/AdminOps.scala @@ -5,7 +5,10 @@ import org.apache.kafka.clients.admin.{ AdminClient, AdminClientConfig, DeleteTopicsOptions, - NewTopic + DescribeTopicsOptions, + ListTopicsOptions, + NewTopic, + TopicDescription } import scala.jdk.CollectionConverters._ @@ -24,6 +27,8 @@ trait AdminOps[C <: EmbeddedKafkaConfig] { val zkConnectionTimeoutMs = 10000 protected val topicCreationTimeout: FiniteDuration = 2.seconds protected val topicDeletionTimeout: FiniteDuration = 2.seconds + protected val topicListTimeout: FiniteDuration = 2.seconds + protected val topicDescribeTimeout: FiniteDuration = 2.seconds protected val adminClientCloseTimeout: FiniteDuration = 2.seconds /** @@ -77,6 +82,53 @@ trait AdminOps[C <: EmbeddedKafkaConfig] { }.map(_ => ()) } + /** + * Lists the topics available. + * @param config + * an implicit [[EmbeddedKafkaConfig]] + * @return + * the list of topic names + */ + def listTopics()(implicit config: C): Try[Set[String]] = { + val opts = new ListTopicsOptions() + .timeoutMs(topicListTimeout.toMillis.toInt) + + withAdminClient { adminClient => + adminClient + .listTopics(opts) + .names() + .get(topicListTimeout.length, topicListTimeout.unit) + .asScala + .toSet + } + } + + /** + * Describe the topics. + * + * @param topics + * the topic names to describe + * @param config + * an implicit [[EmbeddedKafkaConfig]] + * @return + * the list of topic names + */ + def describeTopics( + topics: Seq[String] + )(implicit config: C): Try[Map[String, TopicDescription]] = { + val opts = new DescribeTopicsOptions() + .timeoutMs(topicDescribeTimeout.toMillis.toInt) + + withAdminClient { adminClient => + adminClient + .describeTopics(topics.asJavaCollection, opts) + .allTopicNames() + .get(topicDescribeTimeout.length, topicDescribeTimeout.unit) + .asScala + .toMap + } + } + /** * Creates an `AdminClient`, then executes the body passed as a parameter. * diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/ConsumerOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/ConsumerOps.scala index 1d1d5f7e..32b8ae6b 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/ConsumerOps.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/ConsumerOps.scala @@ -15,8 +15,6 @@ import org.apache.kafka.clients.consumer.{ import org.apache.kafka.common.serialization.{Deserializer, StringDeserializer} import org.apache.kafka.common.{KafkaException, TopicPartition} -// Used by Scala 2.12 -import scala.collection.compat._ import scala.collection.immutable.Map import scala.collection.mutable.ListBuffer import scala.concurrent.TimeoutException diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/embeddedKafkaOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/embeddedKafkaOps.scala index 50971a82..52efe006 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/embeddedKafkaOps.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/embeddedKafkaOps.scala @@ -16,7 +16,6 @@ private[embeddedkafka] trait EmbeddedKafkaOps[ ] extends AdminOps[C] with ConsumerOps[C] with ProducerOps[C] - with ZooKeeperOps with KafkaOps /** @@ -34,5 +33,4 @@ private[embeddedkafka] trait RunningEmbeddedKafkaOps[ ] extends EmbeddedKafkaOps[C, S] with RunningServersOps with ServerStarter[C, S] - with RunningZooKeeperOps with RunningKafkaOps diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala index 1a7e5e3d..364d1b90 100644 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala +++ b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/kafkaOps.scala @@ -1,62 +1,180 @@ package io.github.embeddedkafka.ops -import java.nio.file.Path -import kafka.server.{KafkaConfig, KafkaServer} -import io.github.embeddedkafka.{ - EmbeddedK, - EmbeddedKafkaConfig, - EmbeddedServer, - EmbeddedZ -} +import io.github.embeddedkafka.{EmbeddedK, EmbeddedKafkaConfig, EmbeddedServer} +import kafka.server._ +import org.apache.kafka.common.Uuid import org.apache.kafka.common.security.auth.SecurityProtocol +import org.apache.kafka.common.utils.Time import org.apache.kafka.coordinator.group.GroupCoordinatorConfig -import org.apache.kafka.coordinator.transaction.TransactionLogConfigs +import org.apache.kafka.metadata.properties.{ + MetaProperties, + MetaPropertiesEnsemble, + MetaPropertiesVersion, + PropertiesUtils +} import org.apache.kafka.network.SocketServerConfigs +import org.apache.kafka.raft.QuorumConfig +import org.apache.kafka.server.ServerSocketFactory import org.apache.kafka.server.config.{ + KRaftConfigs, ServerConfigs, - ServerLogConfigs, - ZkConfigs + ServerLogConfigs } import org.apache.kafka.storage.internals.log.CleanerConfig +import org.slf4j.LoggerFactory +import java.io.{File, IOException} +import java.net.ServerSocket +import java.nio.file.{Path, Paths} +import java.util.concurrent.CompletableFuture import scala.jdk.CollectionConverters._ +import scala.util.{Failure, Try, Using} /** * Trait for Kafka-related actions. */ trait KafkaOps { - protected val brokerId: Short = 0 + + private val logger = LoggerFactory.getLogger(getClass) + + protected val nodeId: Int = 0 protected val autoCreateTopics: Boolean = true protected val logCleanerDedupeBufferSize: Int = 1048577 private[embeddedkafka] def startKafka( kafkaPort: Int, - zooKeeperPort: Int, + controllerPort: Int, customBrokerProperties: Map[String, String], kafkaLogDir: Path - ) = { - val zkAddress = s"localhost:$zooKeeperPort" - val listener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort" - - val brokerProperties = Map[String, Object]( - ZkConfigs.ZK_CONNECT_CONFIG -> zkAddress, - ServerConfigs.BROKER_ID_CONFIG -> brokerId.toString, - SocketServerConfigs.LISTENERS_CONFIG -> listener, - SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> listener, + ): (BrokerServer, ControllerServer) = { + + // We need to know the controller port beforehand to set the config for the broker + // Without this the controller starts correctly on a random port but it's too late to use this port in the configs for the broker + val actualControllerPort = findPortForControllerOrFail(controllerPort) + + val brokerListener = s"${SecurityProtocol.PLAINTEXT}://localhost:$kafkaPort" + val controllerListener = s"CONTROLLER://localhost:$actualControllerPort" + + val configProperties = Map[String, Object]( + KRaftConfigs.PROCESS_ROLES_CONFIG -> "broker,controller", + KRaftConfigs.NODE_ID_CONFIG -> nodeId.toString, + KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG -> "CONTROLLER", + QuorumConfig.QUORUM_VOTERS_CONFIG -> s"$nodeId@localhost:$actualControllerPort", + ServerConfigs.BROKER_ID_CONFIG -> nodeId.toString, + SocketServerConfigs.LISTENERS_CONFIG -> s"$brokerListener,$controllerListener", + SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG -> brokerListener, + SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG -> "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL", ServerLogConfigs.AUTO_CREATE_TOPICS_ENABLE_CONFIG -> autoCreateTopics.toString, ServerLogConfigs.LOG_DIRS_CONFIG -> kafkaLogDir.toAbsolutePath.toString, ServerLogConfigs.LOG_FLUSH_INTERVAL_MESSAGES_CONFIG -> 1.toString, GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString, GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG -> 1.toString, - TransactionLogConfigs.TRANSACTIONS_TOPIC_REPLICATION_FACTOR_CONFIG -> 1.toString, - TransactionLogConfigs.TRANSACTIONS_TOPIC_MIN_ISR_CONFIG -> 1.toString, // The total memory used for log deduplication across all cleaner threads, keep it small to not exhaust suite memory CleanerConfig.LOG_CLEANER_DEDUPE_BUFFER_SIZE_PROP -> logCleanerDedupeBufferSize.toString ) ++ customBrokerProperties - val broker = new KafkaServer(new KafkaConfig(brokerProperties.asJava)) + val config = new KafkaConfig(configProperties.asJava) + + val time = Time.SYSTEM + + val clusterIdBase64 = generateRandomClusterId() + + val metaProperties = new MetaProperties.Builder() + .setVersion(MetaPropertiesVersion.V1) + .setClusterId(clusterIdBase64) + .setNodeId(nodeId) + .build() + + // Note: the following is copied from KafkaRaftServer because it doesn't expose a way to retrieve the BrokerServer from the KafkaRaftServer instance + + val logIdent = s"[KafkaRaftServer nodeId=${config.nodeId}] " + + writeMetaProperties(Paths.get(config.metadataLogDir).toFile, metaProperties) + + val (metaPropsEnsemble, bootstrapMetadata) = + KafkaRaftServer.initializeLogDirs(config, logger, logIdent) + + val metrics = Server.initializeMetrics( + config, + time, + metaPropsEnsemble.clusterId().get() + ) + + val sharedServer = new SharedServer( + config, + metaPropsEnsemble, + time, + metrics, + CompletableFuture.completedFuture( + QuorumConfig.parseVoterConnections(config.quorumConfig.voters) + ), + QuorumConfig.parseBootstrapServers(config.quorumConfig.bootstrapServers), + new StandardFaultHandlerFactory(), + ServerSocketFactory.INSTANCE + ) + + val broker: BrokerServer = new BrokerServer(sharedServer) + + val controller: ControllerServer = new ControllerServer( + sharedServer, + KafkaRaftServer.configSchema, + bootstrapMetadata + ) + + // Controller component must be started before the broker component so that + // the controller endpoints are passed to the KRaft manager + controller.startup() broker.startup() - broker + + (broker, controller) + } + + private def generateRandomClusterId(): String = { + Uuid.randomUuid().toString + } + + private def writeMetaProperties( + logDir: File, + metaProperties: MetaProperties + ): Unit = { + val metaPropertiesFile = new File( + logDir.getAbsolutePath, + MetaPropertiesEnsemble.META_PROPERTIES_NAME + ) + PropertiesUtils.writePropertiesFile( + metaProperties.toProperties, + metaPropertiesFile.getAbsolutePath, + false + ) + } + + private def findPortForControllerOrFail(controllerPort: Int): Int = { + if (controllerPort == 0) { + findRandomFreePort() match { + case scala.util.Success(port) => + logger.info(s"Found free port $port for controller") + port + case Failure(exception) => + logger.error( + "Could not find a free port for the controller", + exception + ) + throw new RuntimeException( + s"Could not find a free port for the controller", + exception + ) + } + } else { + controllerPort + } + } + + private def findRandomFreePort(): Try[Int] = { + Using(new ServerSocket(0))(serverSocket => serverSocket.getLocalPort()) + .recoverWith { + case ex: IOException => + Failure(new RuntimeException("Could not find a free port", ex)) + } } } @@ -73,35 +191,38 @@ trait RunningKafkaOps { * * @param kafkaLogsDir * the path for the Kafka logs - * @param factory - * an [[EmbeddedZ]] server * @param config * an implicit [[EmbeddedKafkaConfig]] * @return * an [[EmbeddedK]] server */ - def startKafka(kafkaLogsDir: Path, factory: Option[EmbeddedZ] = None)( + def startKafka(kafkaLogsDir: Path)( implicit config: EmbeddedKafkaConfig ): EmbeddedK = { - val kafkaServer = startKafka( + val (brokerServer, controllerServer) = startKafka( config.kafkaPort, - config.zooKeeperPort, + config.controllerPort, config.customBrokerProperties, kafkaLogsDir ) val configWithUsedPorts = EmbeddedKafkaConfig( - kafkaPort(kafkaServer), - config.zooKeeperPort, + kafkaPort(brokerServer), + controllerPort(controllerServer), config.customBrokerProperties, config.customProducerProperties, config.customConsumerProperties ) - val broker = - EmbeddedK(factory, kafkaServer, kafkaLogsDir, configWithUsedPorts) - runningServers.add(broker) - broker + val servers = + EmbeddedK( + brokerServer, + controllerServer, + kafkaLogsDir, + configWithUsedPorts + ) + runningServers.add(servers) + servers } /** @@ -113,6 +234,15 @@ trait RunningKafkaOps { private[embeddedkafka] def isEmbeddedK(server: EmbeddedServer): Boolean = server.isInstanceOf[EmbeddedK] - private[embeddedkafka] def kafkaPort(kafkaServer: KafkaServer): Int = - kafkaServer.boundPort(kafkaServer.config.listeners.head.listenerName) + private[embeddedkafka] def kafkaPort(kafkaBrokerServer: BrokerServer): Int = + kafkaBrokerServer.boundPort( + kafkaBrokerServer.config.listeners.head.listenerName + ) + + private[embeddedkafka] def controllerPort( + controllerServer: ControllerServer + ): Int = + controllerServer.socketServer.boundPort( + controllerServer.config.controllerListeners.head.listenerName + ) } diff --git a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/zooKeeperOps.scala b/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/zooKeeperOps.scala deleted file mode 100644 index a0249b5b..00000000 --- a/embedded-kafka/src/main/scala/io/github/embeddedkafka/ops/zooKeeperOps.scala +++ /dev/null @@ -1,72 +0,0 @@ -package io.github.embeddedkafka.ops - -import java.net.InetSocketAddress -import java.nio.file.Path - -import io.github.embeddedkafka.{EmbeddedKafkaConfig, EmbeddedServer, EmbeddedZ} -import org.apache.zookeeper.server.{ServerCnxnFactory, ZooKeeperServer} - -/** - * Trait for ZooKeeper-related actions. - */ -trait ZooKeeperOps { - private[embeddedkafka] def startZooKeeper( - zooKeeperPort: Int, - zkLogsDir: Path - ): ServerCnxnFactory = { - val tickTime = 2000 - - val zkServer = new ZooKeeperServer( - zkLogsDir.toFile, - zkLogsDir.toFile, - tickTime - ) - - val factory = ServerCnxnFactory.createFactory - factory.configure(new InetSocketAddress("localhost", zooKeeperPort), 1024) - factory.startup(zkServer) - factory - } -} - -/** - * [[ZooKeeperOps]] extension relying on `RunningServersOps` for keeping track - * of running [[EmbeddedZ]] instances. - */ -trait RunningZooKeeperOps { - this: ZooKeeperOps with RunningServersOps => - - /** - * Starts a Zookeeper instance in memory, storing logs in a specific - * location. - * - * @param zkLogsDir - * the path for the Zookeeper logs - * @param config - * an implicit [[EmbeddedKafkaConfig]] - * @return - * an [[EmbeddedZ]] server - */ - def startZooKeeper( - zkLogsDir: Path - )(implicit config: EmbeddedKafkaConfig): EmbeddedZ = { - val factory = - EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir) - runningServers.add(factory) - factory - } - - /** - * Stops all in memory Zookeeper instances, preserving the logs directories. - */ - def stopZooKeeper(): Unit = - runningServers.stopAndRemove(isEmbeddedZ, clearLogs = false) - - private def isEmbeddedZ(server: EmbeddedServer): Boolean = - server.isInstanceOf[EmbeddedZ] - - private[embeddedkafka] def zookeeperPort(zk: EmbeddedZ): Int = - zookeeperPort(zk.factory) - private def zookeeperPort(fac: ServerCnxnFactory): Int = - fac.getLocalPort -} diff --git a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala index 0b86f5e3..ed1fd2e3 100644 --- a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala +++ b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaMethodsSpec.scala @@ -1,8 +1,5 @@ package io.github.embeddedkafka -import java.util.Collections -import java.util.concurrent.TimeoutException -import kafka.zk.KafkaZkClient import io.github.embeddedkafka.EmbeddedKafka._ import io.github.embeddedkafka.serializers.{ TestJsonDeserializer, @@ -16,15 +13,13 @@ import org.apache.kafka.clients.producer.{ import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.header.internals.RecordHeaders import org.apache.kafka.common.serialization._ -import org.apache.kafka.common.utils.Time import org.apache.kafka.storage.internals.log.CleanerConfig -import org.apache.zookeeper.client.ZKClientConfig import org.scalatest.concurrent.JavaFutures import org.scalatest.time.{Milliseconds, Seconds, Span} import org.scalatest.{Assertion, BeforeAndAfterAll, OptionValues} -// Used by Scala 2.12 -import scala.collection.compat._ +import java.util.Collections +import java.util.concurrent.TimeoutException import scala.concurrent.duration._ import scala.jdk.CollectionConverters._ @@ -171,20 +166,7 @@ class EmbeddedKafkaMethodsSpec ) ) - val zkClient = KafkaZkClient.apply( - s"localhost:${config.zooKeeperPort}", - isSecure = false, - zkSessionTimeoutMs, - zkConnectionTimeoutMs, - maxInFlightRequests = 1, - Time.SYSTEM, - "embedded-kafka-zookeeper-client", - new ZKClientConfig() - ) - - try { - zkClient.topicExists(topic) shouldBe true - } finally zkClient.close() + listTopics().getOrElse(Set.empty) should contain(topic) } "create a topic with custom number of partitions" in { @@ -199,20 +181,11 @@ class EmbeddedKafkaMethodsSpec partitions = 2 ) - val zkClient = KafkaZkClient.apply( - s"localhost:${config.zooKeeperPort}", - isSecure = false, - zkSessionTimeoutMs, - zkConnectionTimeoutMs, - maxInFlightRequests = 1, - Time.SYSTEM, - "embedded-kafka-zookeeper-client", - new ZKClientConfig() - ) - - try { - zkClient.getTopicPartitionCount(topic).value shouldBe 2 - } finally zkClient.close() + describeTopics(Seq(topic)) + .getOrElse(Map.empty) + .apply(topic) + .partitions() + .size() shouldBe 2 } } @@ -229,26 +202,14 @@ class EmbeddedKafkaMethodsSpec deleteTopics(topics) - val zkClient = KafkaZkClient.apply( - s"localhost:${config.zooKeeperPort}", - isSecure = false, - zkSessionTimeoutMs, - zkConnectionTimeoutMs, - maxInFlightRequests = 1, - Time.SYSTEM, - "embedded-kafka-zookeeper-client", - new ZKClientConfig() - ) - eventually { - val noTopicExistsAnymore = topics.forall(t => !zkClient.topicExists(t)) - val allTopicsAreMarkedForDeletion = - topics.forall(t => zkClient.getTopicDeletions.contains(t)) + val allTopicsOrFailure = listTopics() + assert(allTopicsOrFailure.isSuccess) - assert(allTopicsAreMarkedForDeletion || noTopicExistsAnymore) + val allTopics = allTopicsOrFailure.getOrElse(Set.empty) + val noTopicExistsAnymore = topics.forall(t => !allTopics.contains(t)) + assert(noTopicExistsAnymore) } - - zkClient.close() } } diff --git a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaObjectSpec.scala b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaObjectSpec.scala index abbae569..c1b003bc 100644 --- a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaObjectSpec.scala +++ b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaObjectSpec.scala @@ -1,21 +1,19 @@ package io.github.embeddedkafka -import java.nio.file.Files - -import org.apache.kafka.common.serialization.{ - StringDeserializer, - StringSerializer -} import io.github.embeddedkafka.EmbeddedKafka._ import io.github.embeddedkafka.EmbeddedKafkaConfig.{ - defaultKafkaPort, - defaultZookeeperPort + defaultControllerPort, + defaultKafkaPort } import io.github.embeddedkafka.EmbeddedKafkaSpecSupport._ +import org.apache.kafka.common.serialization.{ + StringDeserializer, + StringSerializer +} import org.scalatest.OptionValues -import scala.jdk.CollectionConverters._ import scala.concurrent.duration._ +import scala.jdk.CollectionConverters._ class EmbeddedKafkaObjectSpec extends EmbeddedKafkaSpecSupport @@ -24,21 +22,21 @@ class EmbeddedKafkaObjectSpec "the EmbeddedKafka object" when { "invoking the start and stop methods" should { - "start and stop Kafka and Zookeeper on the default ports" in { + "start and stop Kafka broker and controller on the default ports" in { EmbeddedKafka.start() + expectedServerStatus(defaultControllerPort, Available) expectedServerStatus(defaultKafkaPort, Available) - expectedServerStatus(defaultZookeeperPort, Available) EmbeddedKafka.stop() expectedServerStatus(defaultKafkaPort, NotAvailable) - expectedServerStatus(defaultZookeeperPort, NotAvailable) + expectedServerStatus(defaultControllerPort, NotAvailable) } - "start and stop Kafka and Zookeeper on different specified ports using an implicit configuration" in { + "start and stop Kafka broker and controller on different specified ports using an implicit configuration" in { implicit val config: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 54321) + EmbeddedKafkaConfig(kafkaPort = 12345, controllerPort = 54321) EmbeddedKafka.start() expectedServerStatus(12345, Available) @@ -48,20 +46,20 @@ class EmbeddedKafkaObjectSpec } "start and stop a specific Kafka" in { - val firstBroker = EmbeddedKafka.start()( - EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001) + val firstServer = EmbeddedKafka.start()( + EmbeddedKafkaConfig(kafkaPort = 7000, controllerPort = 7001) ) EmbeddedKafka.start()( - EmbeddedKafkaConfig(kafkaPort = 8000, zooKeeperPort = 8001) + EmbeddedKafkaConfig(kafkaPort = 8000, controllerPort = 8001) ) - expectedServerStatus(7000, Available) expectedServerStatus(7001, Available) + expectedServerStatus(7000, Available) - expectedServerStatus(8000, Available) expectedServerStatus(8001, Available) + expectedServerStatus(8000, Available) - EmbeddedKafka.stop(firstBroker) + EmbeddedKafka.stop(firstServer) expectedServerStatus(7000, NotAvailable) expectedServerStatus(7001, NotAvailable) @@ -72,35 +70,33 @@ class EmbeddedKafkaObjectSpec EmbeddedKafka.stop() } - "start and stop Kafka and Zookeeper successfully on arbitrary available ports" in { + "start and stop Kafka broker and controller successfully on arbitrary available ports" in { val someConfig = - EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0) val kafka = EmbeddedKafka.start()(someConfig) - kafka.factory shouldBe defined - - val usedZookeeperPort = EmbeddedKafka.zookeeperPort(kafka.factory.get) - val usedKafkaPort = EmbeddedKafka.kafkaPort(kafka.broker) + val usedControllerPort = EmbeddedKafka.controllerPort(kafka.controller) + val usedKafkaPort = EmbeddedKafka.kafkaPort(kafka.broker) + expectedServerStatus(usedControllerPort, Available) expectedServerStatus(usedKafkaPort, Available) - expectedServerStatus(usedZookeeperPort, Available) + kafka.config.controllerPort should be(usedControllerPort) kafka.config.kafkaPort should be(usedKafkaPort) - kafka.config.zooKeeperPort should be(usedZookeeperPort) EmbeddedKafka.stop() expectedServerStatus(usedKafkaPort, NotAvailable) - expectedServerStatus(usedZookeeperPort, NotAvailable) + expectedServerStatus(usedControllerPort, NotAvailable) } "start and stop multiple Kafka instances on specified ports" in { val someConfig = - EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 32111) + EmbeddedKafkaConfig(kafkaPort = 12345, controllerPort = 54321) val someBroker = EmbeddedKafka.start()(someConfig) val someOtherConfig = - EmbeddedKafkaConfig(kafkaPort = 23456, zooKeeperPort = 43211) + EmbeddedKafkaConfig(kafkaPort = 23456, controllerPort = 65432) val someOtherBroker = EmbeddedKafka.start()(someOtherConfig) val topic = "publish_test_topic_1" @@ -133,32 +129,12 @@ class EmbeddedKafkaObjectSpec } "invoking the isRunning method" should { - "return true when both Kafka and Zookeeper are running" in { + "return true when Kafka is running" in { EmbeddedKafka.start() EmbeddedKafka.isRunning shouldBe true EmbeddedKafka.stop() EmbeddedKafka.isRunning shouldBe false } - - "return true when both Kafka and Zookeeper are running, if started separately" in { - EmbeddedKafka.startZooKeeper( - Files.createTempDirectory("zookeeper-test-logs") - ) - EmbeddedKafka.startKafka(Files.createTempDirectory("kafka-test-logs")) - - EmbeddedKafka.isRunning shouldBe true - EmbeddedKafka.stop() - EmbeddedKafka.isRunning shouldBe false - } - - "return false when only Zookeeper is running" in { - EmbeddedKafka.startZooKeeper( - Files.createTempDirectory("zookeeper-test-logs") - ) - EmbeddedKafka.isRunning shouldBe false - EmbeddedKafka.stop() - EmbeddedKafka.isRunning shouldBe false - } } } } diff --git a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala index 8c1b7eff..4964d3ce 100644 --- a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala +++ b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaOnFoundPortSpec.scala @@ -10,9 +10,9 @@ import org.scalatest.Assertion class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec extends EmbeddedKafkaSpecSupport { "the withRunningKafkaOnFoundPort method" should { - "start and stop Kafka and Zookeeper successfully on non-zero ports" in { + "start and stop Kafka broker and controller successfully on non-zero ports" in { val userDefinedConfig = - EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 12346) + EmbeddedKafkaConfig(kafkaPort = 12345, controllerPort = 54321) val actualConfig = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig => actualConfig shouldBe userDefinedConfig @@ -22,9 +22,9 @@ class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec noServerIsAvailable(actualConfig) } - "start and stop multiple Kafka and Zookeeper successfully on arbitrary available ports" in { + "start and stop multiple Kafka broker and controller successfully on arbitrary available ports" in { val userDefinedConfig = - EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0) val actualConfig1 = withRunningKafkaOnFoundPort(userDefinedConfig) { actualConfig1 => everyServerIsAvailable(actualConfig1) @@ -43,12 +43,12 @@ class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec Seq(userDefinedConfig, actualConfig1, actualConfig2) // Confirm both actual configs are running on separate non-zero ports, but otherwise equal allConfigs.map(_.kafkaPort).distinct should have size 3 - allConfigs.map(_.zooKeeperPort).distinct should have size 3 + allConfigs.map(_.controllerPort).distinct should have size 3 allConfigs .map(config => EmbeddedKafkaConfigImpl( kafkaPort = 0, - zooKeeperPort = 0, + controllerPort = 0, config.customBrokerProperties, config.customProducerProperties, config.customConsumerProperties @@ -65,7 +65,7 @@ class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec "work with a simple example using implicits" in { val userDefinedConfig = - EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0) withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig => publishStringMessageToKafka("topic", "message") consumeFirstStringMessageFrom("topic") shouldBe "message" @@ -74,12 +74,12 @@ class EmbeddedKafkaWithRunningKafkaOnFoundPortSpec } private def everyServerIsAvailable(config: EmbeddedKafkaConfig): Assertion = { + expectedServerStatus(config.controllerPort, Available) expectedServerStatus(config.kafkaPort, Available) - expectedServerStatus(config.zooKeeperPort, Available) } private def noServerIsAvailable(config: EmbeddedKafkaConfig): Assertion = { + expectedServerStatus(config.controllerPort, NotAvailable) expectedServerStatus(config.kafkaPort, NotAvailable) - expectedServerStatus(config.zooKeeperPort, NotAvailable) } } diff --git a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaSpec.scala b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaSpec.scala index 251a1db7..d8798402 100644 --- a/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaSpec.scala +++ b/embedded-kafka/src/test/scala/io/github/embeddedkafka/EmbeddedKafkaWithRunningKafkaSpec.scala @@ -4,8 +4,8 @@ import io.github.embeddedkafka.EmbeddedKafka._ import io.github.embeddedkafka.EmbeddedKafkaSpecSupport._ import org.scalatest.exceptions.TestFailedException import io.github.embeddedkafka.EmbeddedKafkaConfig.{ - defaultKafkaPort, - defaultZookeeperPort + defaultControllerPort, + defaultKafkaPort } class EmbeddedKafkaWithRunningKafkaSpec extends EmbeddedKafkaSpecSupport { @@ -16,20 +16,14 @@ class EmbeddedKafkaWithRunningKafkaSpec extends EmbeddedKafkaSpecSupport { } } - "start a ZooKeeper instance on port 6000 by default" in { - withRunningKafka { - expectedServerStatus(defaultZookeeperPort, Available) - } - } - - "stop Kafka and Zookeeper successfully" when { + "stop Kafka successfully" when { "the enclosed test passes" in { withRunningKafka { true shouldBe true } + expectedServerStatus(defaultControllerPort, NotAvailable) expectedServerStatus(defaultKafkaPort, NotAvailable) - expectedServerStatus(defaultZookeeperPort, NotAvailable) } "the enclosed test fails" in { @@ -39,8 +33,8 @@ class EmbeddedKafkaWithRunningKafkaSpec extends EmbeddedKafkaSpecSupport { } } + expectedServerStatus(defaultControllerPort, NotAvailable) expectedServerStatus(defaultKafkaPort, NotAvailable) - expectedServerStatus(defaultZookeeperPort, NotAvailable) } } @@ -53,13 +47,14 @@ class EmbeddedKafkaWithRunningKafkaSpec extends EmbeddedKafkaSpecSupport { } } - "start a Zookeeper server on a specified port" in { + "start a Kafka controller on a specified port" in { implicit val config: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(zooKeeperPort = 12345) + EmbeddedKafkaConfig(controllerPort = 12345) withRunningKafka { expectedServerStatus(12345, Available) } } + } } diff --git a/kafka-connect/src/main/scala/io/github/embeddedkafka/connect/EmbeddedKafkaConnect.scala b/kafka-connect/src/main/scala/io/github/embeddedkafka/connect/EmbeddedKafkaConnect.scala index 7e683cd9..4c8bc913 100644 --- a/kafka-connect/src/main/scala/io/github/embeddedkafka/connect/EmbeddedKafkaConnect.scala +++ b/kafka-connect/src/main/scala/io/github/embeddedkafka/connect/EmbeddedKafkaConnect.scala @@ -12,7 +12,7 @@ import org.apache.kafka.connect.runtime.standalone.{ } import org.apache.kafka.connect.runtime.{Connect, Worker, WorkerConfig} import org.apache.kafka.connect.storage.FileOffsetBackingStore -import io.github.embeddedkafka.ops.{KafkaOps, ZooKeeperOps} +import io.github.embeddedkafka.ops.{KafkaOps} import io.github.embeddedkafka.{ EmbeddedKafka, EmbeddedKafkaConfig, @@ -35,7 +35,7 @@ trait EmbeddedKafkaConnect private[embeddedkafka] trait EmbeddedKafkaConnectSupport[ C <: EmbeddedKafkaConfig ] { - this: EmbeddedKafkaSupport[C] with ZooKeeperOps with KafkaOps => + this: EmbeddedKafkaSupport[C] with KafkaOps => protected[embeddedkafka] def connectConfig: EmbeddedConnectConfig[C] diff --git a/kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala b/kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala index e08676b8..bb7ecf00 100644 --- a/kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala +++ b/kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala @@ -11,7 +11,7 @@ import io.github.embeddedkafka.connect.EmbeddedKafkaConnect._ class ExampleKafkaConnectSpec extends EmbeddedKafkaSpecSupport { implicit val kafkaConfig: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001) + EmbeddedKafkaConfig(kafkaPort = 7000) "A Kafka Connect test" should { "start a Connect server on a specified port" in { diff --git a/kafka-streams/src/main/scala/io/github/embeddedkafka/streams/EmbeddedKafkaStreams.scala b/kafka-streams/src/main/scala/io/github/embeddedkafka/streams/EmbeddedKafkaStreams.scala index 08d3ab32..af84daec 100644 --- a/kafka-streams/src/main/scala/io/github/embeddedkafka/streams/EmbeddedKafkaStreams.scala +++ b/kafka-streams/src/main/scala/io/github/embeddedkafka/streams/EmbeddedKafkaStreams.scala @@ -2,7 +2,7 @@ package io.github.embeddedkafka.streams import java.util.Properties -import io.github.embeddedkafka.ops.{AdminOps, KafkaOps, ZooKeeperOps} +import io.github.embeddedkafka.ops.{AdminOps, KafkaOps} import io.github.embeddedkafka.{ EmbeddedKafka, EmbeddedKafkaConfig, @@ -25,10 +25,7 @@ trait EmbeddedKafkaStreams private[embeddedkafka] trait EmbeddedKafkaStreamsSupport[ C <: EmbeddedKafkaConfig ] { - this: EmbeddedKafkaSupport[C] - with AdminOps[C] - with ZooKeeperOps - with KafkaOps => + this: EmbeddedKafkaSupport[C] with AdminOps[C] with KafkaOps => protected[embeddedkafka] def streamsConfig: EmbeddedStreamsConfig[C] diff --git a/kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala b/kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala index ce1fbe99..5d83386c 100644 --- a/kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala +++ b/kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala @@ -25,7 +25,7 @@ class ExampleKafkaStreamsSpec "A Kafka streams test" should { "be easy to run with streams and consumer lifecycle management" in { implicit val config: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001) + EmbeddedKafkaConfig(kafkaPort = 7000, controllerPort = 7001) val streamBuilder = new StreamsBuilder val stream: KStream[String, String] = @@ -54,7 +54,7 @@ class ExampleKafkaStreamsSpec "be easy to run with streams on arbitrary available ports" in { val userDefinedConfig: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0) + EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0) val streamBuilder = new StreamsBuilder val stream: KStream[String, String] = @@ -86,7 +86,7 @@ class ExampleKafkaStreamsSpec "allow support creating custom consumers" in { implicit val config: EmbeddedKafkaConfig = - EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001) + EmbeddedKafkaConfig(kafkaPort = 7000, controllerPort = 7001) implicit val patienceConfig: PatienceConfig = PatienceConfig(5.seconds, 100.millis) diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 3e1552ea..88b1c0e0 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -5,10 +5,10 @@ object Dependencies { object Versions { val Scala3 = "3.3.5" val Scala213 = "2.13.16" - val Scala212 = "2.12.20" - val Kafka = "3.9.0" + val Kafka = "4.0.0" val Slf4j = "1.7.36" val ScalaTest = "3.2.19" + val Jackson = "2.16.2" // Keep consistent with the one provided by Kafka } object Common { @@ -23,6 +23,9 @@ object Dependencies { lazy val prodDeps: Seq[ModuleID] = Seq( "org.apache.kafka" %% "kafka" % Versions.Kafka cross CrossVersion.for3Use2_13 ) + lazy val testDeps: Seq[ModuleID] = Seq( + "com.fasterxml.jackson.module" %% "jackson-module-scala" % Versions.Jackson + ).map(_ % Test) } object KafkaStreams {