Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage-report.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
java-version: 17
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka Connect & Server are not available for Java < 17 anymore: https://kafka.apache.org/40/documentation/compatibility.html

cache: 'sbt'
- uses: sbt/setup-sbt@v1
- name: Compile
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
java-version: 17
cache: 'sbt'
- uses: sbt/setup-sbt@v1
- name: Publish artifacts
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
- uses: actions/setup-java@v4
with:
distribution: 'temurin'
java-version: 8
java-version: 17
cache: 'sbt'
- uses: sbt/setup-sbt@v1
- name: Check formatting
Expand Down
76 changes: 54 additions & 22 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,25 +11,57 @@ A library that provides an in-memory Kafka instance to run your tests against.

Inspired by [kafka-unit](https://github.com/chbatey/kafka-unit).

1. [Version compatibility matrix](#version-compatibility-matrix)
2. [Upgrade notes](#upgrade-notes)
3. [Usage](#usage)
1. [embedded-kafka](#embedded-kafka-1)
2. [embedded-kafka-streams](#embedded-kafka-streams)
3. [embedded-kafka-connect](#embedded-kafka-connect)

---

## Version compatibility matrix

embedded-kafka is available on Maven Central, compiled for Scala 2.12, 2.13 and Scala 3 (since v3.4.0.1).
The library is available on Maven Central.

Versions match the version of Kafka they're built against.

## Important known limitation (prior to v2.8.0)
| embedded-kafka version | Kafka version | Scala versions | Java version |
|------------------------|---------------|-----------------|--------------|
| 4.0.0 | 4.0.0 | 2.13, 3.3 | 17+ |
| 3.4.0.1 - 3.9.0 | 3.4.0 - 3.9.0 | 2,12, 2.13, 3.3 | 8+ |

_Note that [prior to v2.8.0](https://github.com/apache/kafka/pull/10174) Kafka core was inlining the Scala library, so you couldn't use a different Scala **patch** version than [what Kafka used to compile its jars](https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L30)._

[Prior to v2.8.0](https://github.com/apache/kafka/pull/10174) Kafka core was inlining the Scala library, so you couldn't use a different Scala **patch** version than [what Kafka used to compile its jars](https://github.com/apache/kafka/blob/trunk/gradle/dependencies.gradle#L30)!
---

## Breaking change: new package name
## Upgrade notes

From v2.8.0 onwards package name has been updated to reflect the library group id (i.e. `io.github.embeddedkafka`).
### 4.0.0

Major changes:
- **Java 17+:** as [Kafka Server 4.x requires Java 17+](https://kafka.apache.org/40/documentation/compatibility.html), so does embedded-kafka even though Kafka Clients/Streams are still available for Java 11+.
- **Scala 2.13+**: Kafka is not compiled against Scala 2.12 anymore, so does embedded-kafka.
- embedded-kafka 4.0.0 starts a Kafka server in combined mode (broker and controller) and no more Zookeeper.

As a user, you'll have to change your code to use `controllerPort` instead of `zookeeperPort` in places you were doing so:
```diff
- EmbeddedKafkaConfig(kafkaPort = 12345, zooKeeperPort = 54321)
+ EmbeddedKafkaConfig(kafkaPort = 12345, controllerPort = 54321)
```

### 2.8.0

**Package name change:** from v2.8.0 onwards package name has been updated to reflect the library group id (i.e. `io.github.embeddedkafka`).
Aliases to the old package name have been added, along with a one-time [Scalafix rule](https://github.com/embeddedkafka/embedded-kafka-scalafix) to ensure the smoothest migration.

## embedded-kafka
---

## Usage

### embedded-kafka

### How to use
#### How to use

* In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka" % "x.x.x" % Test`
* Have your class extend the `EmbeddedKafka` trait.
Expand All @@ -52,11 +84,11 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
}
```

* In-memory Zookeeper and Kafka will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.
* In-memory Kafka broker and controller (combined mode) will be instantiated respectively on port 6000 and 6001 and automatically shutdown at the end of the test.

### Use without the `withRunningKafka` method
#### Use without the `withRunningKafka` method

A `EmbeddedKafka` companion object is provided for usage without extending the `EmbeddedKafka` trait. Zookeeper and Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka and Zookeeper on every test.
A `EmbeddedKafka` companion object is provided for usage without extending the `EmbeddedKafka` trait. Kafka can be started and stopped in a programmatic way. This is the recommended usage if you have more than one test in your file and you don't want to start and stop Kafka on every test.

```scala
class MySpec extends AnyWordSpecLike with Matchers {
Expand All @@ -76,9 +108,9 @@ class MySpec extends AnyWordSpecLike with Matchers {

Please note that in order to avoid Kafka instances not shutting down properly, it's recommended to call `EmbeddedKafka.stop()` in a `after` block or in a similar teardown logic.

### Configuration
#### Configuration

It's possible to change the ports on which Zookeeper and Kafka are started by providing an implicit `EmbeddedKafkaConfig`
It's possible to change the ports on which Kafka broker and controller are started by providing an implicit `EmbeddedKafkaConfig`

```scala
class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
Expand All @@ -96,7 +128,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
}
```

If you want to run ZooKeeper and Kafka on arbitrary available ports, you can
If you want to run Kafka broker and controller on arbitrary available ports, you can
use the `withRunningKafkaOnFoundPort` method. This is useful to make tests more
reliable, especially when running tests in parallel or on machines where other
tests or services may be running with port numbers you can't control.
Expand All @@ -107,7 +139,7 @@ class MySpec extends AnyWordSpecLike with Matchers with EmbeddedKafka {
"runs with embedded kafka on arbitrary available ports" should {

"work" in {
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, zooKeeperPort = 0)
val userDefinedConfig = EmbeddedKafkaConfig(kafkaPort = 0, controllerPort = 0)

withRunningKafkaOnFoundPort(userDefinedConfig) { implicit actualConfig =>
// now a kafka broker is listening on actualConfig.kafkaPort
Expand Down Expand Up @@ -153,7 +185,7 @@ Those properties will be added to the broker configuration, be careful some prop
in case of conflict the `customBrokerProperties` values will take precedence. Please look at the source code to see what these properties
are.

### Utility methods
#### Utility methods

The `EmbeddedKafka` trait provides also some utility methods to interact with the embedded kafka, in order to set preconditions or verifications in your specs:

Expand All @@ -165,17 +197,17 @@ def consumeFirstMessageFrom(topic: String): String
def createCustomTopic(topic: String, topicConfig: Map[String,String], partitions: Int, replicationFactor: Int): Unit
```

### Custom producers
#### Custom producers

Given implicits `Deserializer`s for each type and an `EmbeddedKafkaConfig` it is possible to use `withProducer[A, B, R] { your code here }` where R is the code return type.

For more information about how to use the utility methods, you can either look at the Scaladocs or at the tests of this project.

### Custom consumers
#### Custom consumers

Given implicits `Serializer`s for each type and an `EmbeddedKafkaConfig` it is possible to use `withConsumer[A, B, R] { your code here }` where R is the code return type.

### Loan methods example
#### Loan methods example

A simple test using loan methods can be as simple as this:

Expand All @@ -201,26 +233,26 @@ A simple test using loan methods can be as simple as this:
})
```

## embedded-kafka-streams
### embedded-kafka-streams

A library that builds on top of `embedded-kafka` to offer easy testing of [Kafka Streams](https://kafka.apache.org/documentation/streams).

It takes care of instantiating and starting your streams as well as closing them after running your test-case code.

### How to use
#### How to use

* In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka-streams" % "x.x.x" % Test`
* Have a look at the [example test](kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala)
* For most of the cases have your class extend the `EmbeddedKafkaStreams` trait. This offers both streams management and easy loaning of producers and consumers for asserting resulting messages in output/sink topics.
* Use `EmbeddedKafkaStreams.runStreams` and `EmbeddedKafka.withConsumer` and `EmbeddedKafka.withProducer`. This allows you to create your own consumers of custom types as seen in the [example test](kafka-streams/src/test/scala/io/github/embeddedkafka/streams/ExampleKafkaStreamsSpec.scala).

## embedded-kafka-connect
### embedded-kafka-connect

A library that builds on top of `embedded-kafka` to offer easy testing of [Kafka Connect](https://kafka.apache.org/documentation/#connect).

It takes care of instantiating and starting a Kafka Connect server as well as closing it after running your test-case code.

### How to use
#### How to use

* In your `build.sbt` file add the following dependency (replace `x.x.x` with the appropriate version): `"io.github.embeddedkafka" %% "embedded-kafka-connect" % "x.x.x" % Test`
* Have a look at the [example test](kafka-connect/src/test/scala/io/github/embeddedkafka/connect/ExampleKafkaConnectSpec.scala)
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ lazy val commonSettings = Seq(
organization := "io.github.embeddedkafka",
scalaVersion := Versions.Scala213,
crossScalaVersions := Seq(
Versions.Scala212,
Versions.Scala213,
Versions.Scala3
)
Expand All @@ -92,6 +91,7 @@ lazy val embeddedKafka = (project in file("embedded-kafka"))
.settings(name := "embedded-kafka")
.settings(commonSettings: _*)
.settings(libraryDependencies ++= EmbeddedKafka.prodDeps)
.settings(libraryDependencies ++= EmbeddedKafka.testDeps)

lazy val kafkaStreams = (project in file("kafka-streams"))
.settings(name := "embedded-kafka-streams")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,19 @@ trait EmbeddedKafka

override private[embeddedkafka] def withRunningServers[T](
config: EmbeddedKafkaConfig,
actualZkPort: Int,
kafkaLogsDir: Path
)(body: EmbeddedKafkaConfig => T): T = {
val broker =
val (broker, controller) =
startKafka(
config.kafkaPort,
actualZkPort,
config.controllerPort,
config.customBrokerProperties,
kafkaLogsDir
)

val configWithUsedPorts = EmbeddedKafkaConfig(
EmbeddedKafka.kafkaPort(broker),
actualZkPort,
EmbeddedKafka.controllerPort(controller),
config.customBrokerProperties,
config.customProducerProperties,
config.customConsumerProperties
Expand All @@ -43,8 +42,13 @@ trait EmbeddedKafka
try {
body(configWithUsedPorts)
} finally {
// In combined mode, we want to shut down the broker first, since the controller may be
// needed for controlled shutdown. Additionally, the controller shutdown process currently
// stops the raft client early on, which would disrupt broker shutdown.
broker.shutdown()
controller.shutdown()
broker.awaitShutdown()
controller.awaitShutdown()
}
}
}
Expand All @@ -53,21 +57,8 @@ object EmbeddedKafka
extends EmbeddedKafka
with RunningEmbeddedKafkaOps[EmbeddedKafkaConfig, EmbeddedK] {
override def start()(implicit config: EmbeddedKafkaConfig): EmbeddedK = {
val zkLogsDir = Files.createTempDirectory("zookeeper-logs")
val kafkaLogsDir = Files.createTempDirectory("kafka-logs")

val factory =
EmbeddedZ(startZooKeeper(config.zooKeeperPort, zkLogsDir), zkLogsDir)

val configWithUsedPorts = EmbeddedKafkaConfig(
config.kafkaPort,
zookeeperPort(factory),
config.customBrokerProperties,
config.customProducerProperties,
config.customConsumerProperties
)

startKafka(kafkaLogsDir, Option(factory))(configWithUsedPorts)
startKafka(kafkaLogsDir)(config)
}

override def isRunning: Boolean =
Expand All @@ -77,50 +68,45 @@ object EmbeddedKafka
}

private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
this: ZooKeeperOps with KafkaOps =>
this: KafkaOps =>

/**
* Starts a Kafka broker (and performs additional logic, if any), then
* executes the body passed as a parameter.
*
* @param config
* the user-defined [[EmbeddedKafkaConfig]]
* @param actualZkPort
* the actual ZooKeeper port
* @param kafkaLogsDir
* the path for the Kafka logs
* @param body
* the function to execute
*/
private[embeddedkafka] def withRunningServers[T](
config: C,
actualZkPort: Int,
kafkaLogsDir: Path
)(body: C => T): T

/**
* Starts a ZooKeeper instance and a Kafka broker (and performs additional
* logic, if any), then executes the body passed as a parameter.
* Starts a Kafka broker and controller (and performs additional logic, if
* any), then executes the body passed as a parameter.
*
* @param body
* the function to execute
* @param config
* an implicit [[EmbeddedKafkaConfig]]
*/
def withRunningKafka[T](body: => T)(implicit config: C): T = {
withRunningZooKeeper(config.zooKeeperPort) { actualZkPort =>
withTempDir("kafka") { kafkaLogsDir =>
withRunningServers(config, actualZkPort, kafkaLogsDir)(_ => body)
}
withTempDir("kafka") { kafkaLogsDir =>
withRunningServers(config, kafkaLogsDir)(_ => body)
}
}

/**
* Starts a ZooKeeper instance and a Kafka broker (and performs additional
* logic, if any), then executes the body passed as a parameter. The actual
* ports of the servers will be detected and inserted into a copied version
* of the [[EmbeddedKafkaConfig]] that gets passed to body. This is useful if
* you set any port to `0`, which will listen on an arbitrary available port.
* Starts a Kafka broker and controller (and performs additional logic, if
* any), then executes the body passed as a parameter. The actual ports of
* the servers will be detected and inserted into a copied version of the
* [[EmbeddedKafkaConfig]] that gets passed to body. This is useful if you
* set any port to `0`, which will listen on an arbitrary available port.
*
* @param config
* the user-defined [[EmbeddedKafkaConfig]]
Expand All @@ -129,23 +115,8 @@ private[embeddedkafka] trait EmbeddedKafkaSupport[C <: EmbeddedKafkaConfig] {
* actual ports the servers are running on
*/
def withRunningKafkaOnFoundPort[T](config: C)(body: C => T): T = {
withRunningZooKeeper(config.zooKeeperPort) { actualZkPort =>
withTempDir("kafka") { kafkaLogsDir =>
withRunningServers(config, actualZkPort, kafkaLogsDir)(body)
}
}
}

private[embeddedkafka] def withRunningZooKeeper[T](
port: Int
)(body: Int => T): T = {
withTempDir("zookeeper-logs") { zkLogsDir =>
val factory = startZooKeeper(port, zkLogsDir)
try {
body(factory.getLocalPort)
} finally {
factory.shutdown()
}
withTempDir("kafka") { kafkaLogsDir =>
withRunningServers(config, kafkaLogsDir)(body)
}
}

Expand Down
Loading