Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions contrib/kafka/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# KafkaIO : Dataflow Unbounded Source and Sink for Kafka Topics

KafkaIO provides unbounded source and sink for [Kafka](http://kafka.apache.org/)
topics. Kafka versions 0.9 and above are supported.

## Basic Usage

* Read from a topic with 8 byte long keys and string values:
```java
PCollection<KV<Long, String>> kafkaRecords =
pipeline
.apply(KafkaIO.read()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopics(ImmutableList.of("topic_a"))
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withoutMetadata()
);
```

* Write the same PCollection to a Kafka topic:
```java
kafkaRecords.apply(KafkaIO.write()
.withBootstrapServers("broker_1:9092,broker_2:9092")
.withTopic("results")
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

missing last line?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

);
```

Please see JavaDoc for KafkaIO in
[KafkaIO.java](https://github.com/GoogleCloudPlatform/DataflowJavaSDK/blob/master/contrib/kafka/src/main/java/com/google/cloud/dataflow/contrib/kafka/KafkaIO.java#L100)
for complete documentation and a more descriptive usage example.

## Release Notes
* **0.2.0** : Assign one split for each of the Kafka topic partitions. This makes Dataflow
[Update](https://cloud.google.com/dataflow/pipelines/updating-a-pipeline)
from previous version incompatible.
* **0.1.0** : KafkaIO with support for Unbounded Source and Sink.
2 changes: 1 addition & 1 deletion contrib/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
<artifactId>google-cloud-dataflow-java-contrib-kafka</artifactId>
<name>Google Cloud Dataflow Kafka IO Library</name>
<description>Library to read Kafka topics.</description>
<version>0.1.0-SNAPSHOT</version>
<version>0.2.0-SNAPSHOT</version>

<properties>
<dataflow.version>[1.6.0, 2.0.0)</dataflow.version>
Expand Down