Skip to content

Commit affc312

Browse files
committed
Cleanup the connectors-overview and Kafka-connector sections. Use System/StreamDescriptors
Author: Jagadish <[email protected]> Reviewers: Jagadish <[email protected]> Closes apache#778 from vjagadish1989/website-reorg26
1 parent 45f2558 commit affc312

File tree

2 files changed

+112
-94
lines changed

2 files changed

+112
-94
lines changed

docs/learn/documentation/versioned/connectors/kafka.md

Lines changed: 96 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -19,108 +19,132 @@ title: Kafka Connector
1919
limitations under the License.
2020
-->
2121

22-
## Overview
23-
Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process it and emit results to other Kafka topics or external systems.
22+
### Kafka I/O : QuickStart
23+
Samza offers built-in integration with Apache Kafka for stream processing. A common pattern in Samza applications is to read messages from one or more Kafka topics, process them and emit results to other Kafka topics or databases.
2424

25-
## Consuming from Kafka
25+
The `hello-samza` project includes multiple examples on interacting with Kafka from your Samza jobs. Each example also includes instructions on how to run them and view results.
2626

27-
### <a name="kafka-basic-configuration"></a>Basic Configuration
27+
- [High-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/cookbook/FilterExample.java) with a corresponding [tutorial](/learn/documentation/{{site.version}}/deployment/yarn.html#starting-your-application-on-yarn)
2828

29-
The example below provides a basic example for configuring a system called `kafka-cluster-1` that uses the provided KafkaSystemFactory.
29+
- [Low-level API Example](https://github.com/apache/samza-hello-samza/blob/latest/src/main/java/samza/examples/wikipedia/task/application/WikipediaParserTaskApplication.java) with a corresponding [tutorial](https://github.com/apache/samza-hello-samza#hello-samza)
3030

31-
{% highlight jproperties %}
32-
# Set the SystemFactory implementation to instantiate KafkaSystemConsumer, KafkaSystemProducer and KafkaSystemAdmin
33-
systems.kafka-cluster-1.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
3431

35-
# Define the default key and message SerDe.
36-
systems.kafka-cluster-1.default.stream.samza.key.serde=string
37-
systems.kafka-cluster-1.default.stream.samza.msg.serde=json
32+
### Concepts
3833

39-
# Zookeeper nodes of the Kafka cluster
40-
systems.kafka-cluster-1.consumer.zookeeper.connect=localhost:2181
34+
####KafkaSystemDescriptor
4135

42-
# List of network endpoints where Kafka brokers are running. Also needed by consumers for querying metadata.
43-
systems.kafka-cluster-1.producer.bootstrap.servers=localhost:9092,localhost:9093
36+
Samza refers to any IO source (eg: Kafka) it interacts with as a _system_, whose properties are set using a corresponding `SystemDescriptor`. The `KafkaSystemDescriptor` allows you to describe the Kafka cluster you are interacting with and specify its properties.
37+
38+
{% highlight java %}
39+
KafkaSystemDescriptor kafkaSystemDescriptor =
40+
new KafkaSystemDescriptor("kafka").withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
41+
.withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
42+
.withDefaultStreamConfigs(KAFKA_DEFAULT_STREAM_CONFIGS);
4443
{% endhighlight %}
4544

46-
Samza provides a built-in KafkaSystemDescriptor to consume from and produce to Kafka from the StreamApplication (High-level API) or the TaskApplication (Low-level API).
4745

48-
Below is an example of how to use the descriptors in the describe method of a StreamApplication.
46+
####KafkaInputDescriptor
4947

48+
A Kafka cluster usually has multiple topics (a.k.a _streams_). The `KafkaInputDescriptor` allows you to specify the properties of each Kafka topic your application should read from. For each of your input topics, you should create a corresponding instance of `KafkaInputDescriptor`
49+
by providing a topic-name and a serializer.
5050
{% highlight java %}
51-
public class PageViewFilter implements StreamApplication {
52-
@Override
53-
public void describe(StreamApplicationDescriptor appDesc) {
54-
// add input and output streams
55-
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1");
56-
KafkaInputDescriptor<PageView> isd = ksd.getInputDescriptor("myinput", new JsonSerdeV2<>(PageView.class));
57-
KafkaOutputDescriptor<DecoratedPageView> osd = ksd.getOutputDescriptor("myout", new JsonSerdeV2<>(DecordatedPageView.class));
58-
59-
MessageStream<PageView> ms = appDesc.getInputStream(isd);
60-
OutputStream<DecoratedPageView> os = appDesc.getOutputStream(osd);
61-
62-
ms.filter(this::isValidPageView)
63-
.map(this::addProfileInformation)
64-
.sendTo(os);
65-
}
66-
}
51+
KafkaInputDescriptor<PageView> pageViewStreamDescriptor = kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class));
6752
{% endhighlight %}
6853

69-
Below is an example of how to use the descriptors in the describe method of a TaskApplication
54+
The above example describes an input Kafka stream from the "page-view-topic" which Samza de-serializes into a JSON payload. Samza provides default serializers for common data-types like string, avro, bytes, integer etc.
55+
56+
####KafkaOutputDescriptor
57+
58+
Similarly, the `KafkaOutputDescriptor` allows you to specify the output streams for your application. For each output topic you write to, you should create an instance of `KafkaOutputDescriptor`.
7059

7160
{% highlight java %}
72-
public class PageViewFilterTask implements TaskApplication {
73-
@Override
74-
public void describe(TaskApplicationDescriptor appDesc) {
75-
// add input and output streams
76-
KafkaSystemDescriptor ksd = new KafkaSystemDescriptor("kafka-cluster-1");
77-
KafkaInputDescriptor<String> isd = ksd.getInputDescriptor("myinput", new StringSerde());
78-
KafkaOutputDescriptor<String> osd = ksd.getOutputDescriptor("myout", new StringSerde());
79-
80-
appDesc.addInputStream(isd);
81-
appDesc.addOutputStream(osd);
82-
appDesc.addTable(td);
83-
84-
appDesc.withTaskFactory((StreamTaskFactory) () -> new MyStreamTask());
85-
}
86-
}
61+
KafkaOutputDescriptor<DecoratedPageView> decoratedPageView = kafkaSystemDescriptor.getOutputDescriptor("my-output-topic", new JsonSerdeV2<>(DecoratedPageView.class));
8762
{% endhighlight %}
8863

89-
### Advanced Configuration
9064

91-
Prefix the configuration with `systems.system-name.consumer.` followed by any of the Kafka consumer configurations. See [Kafka Consumer Configuration Documentation](http://kafka.apache.org/documentation.html#consumerconfigs)
65+
### Configuration
9266

93-
{% highlight jproperties %}
94-
systems.kafka-cluster-1.consumer.security.protocol=SSL
95-
systems.kafka-cluster-1.consumer.max.partition.fetch.bytes=524288
67+
#####Configuring Kafka producer and consumer
68+
69+
The `KafkaSystemDescriptor` allows you to specify any [Kafka producer](https://kafka.apache.org/documentation/#producerconfigs) or [Kafka consumer](https://kafka.apache.org/documentation/#consumerconfigs)) property which are directly passed over to the underlying Kafka client. This allows for
70+
precise control over the KafkaProducer and KafkaConsumer used by Samza.
71+
72+
{% highlight java %}
73+
KafkaSystemDescriptor kafkaSystemDescriptor =
74+
new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..)
75+
.withProducerBootstrapServers(..)
76+
.withConsumerConfigs(..)
77+
.withProducerConfigs(..)
9678
{% endhighlight %}
9779

98-
## Producing to Kafka
9980

100-
### Basic Configuration
81+
####Accessing an offset which is out-of-range
82+
This setting determines the behavior if a consumer attempts to read an offset that is outside of the current valid range maintained by the broker. This could happen if the topic does not exist, or if a checkpoint is older than the maximum message history retained by the brokers.
10183

102-
The basic configuration is the same as [Consuming from Kafka](#kafka-basic-configuration).
84+
{% highlight java %}
85+
KafkaSystemDescriptor kafkaSystemDescriptor =
86+
new KafkaSystemDescriptor("kafka").withConsumerZkConnect(..)
87+
.withProducerBootstrapServers(..)
88+
.withConsumerAutoOffsetReset("largest")
89+
{% endhighlight %}
10390

104-
### Advanced Configuration
10591

106-
#### Changelog to Kafka for State Stores
92+
#####Ignoring checkpointed offsets
93+
Samza periodically persists the last processed Kafka offsets as a part of its checkpoint. During startup, Samza resumes consumption from the previously checkpointed offsets by default. You can over-ride this behavior and configure Samza to ignore checkpoints with `KafkaInputDescriptor#shouldResetOffset()`.
94+
Once there are no checkpoints for a stream, the `#withOffsetDefault(..)` determines whether we start consumption from the oldest or newest offset.
10795

108-
For Samza processors that have local state and is configured with a changelog for durability, if the changelog is configured to use Kafka, there are Kafka specific configuration parameters.
109-
See section on `TODO: link to state management section` State Management `\TODO` for more details.
96+
{% highlight java %}
97+
KafkaInputDescriptor<PageView> pageViewStreamDescriptor =
98+
kafkaSystemDescriptor.getInputDescriptor("page-view-topic", new JsonSerdeV2<>(PageView.class))
99+
.shouldResetOffset()
100+
.withOffsetDefault(OffsetType.OLDEST);
110101

111-
{% highlight jproperties %}
112-
stores.store-name.changelog=kafka-cluster-2.changelog-topic-name
113-
stores.store-name.changelog.replication.factor=3
114-
stores.store-name.changelog.kafka.cleanup.policy=compact
115102
{% endhighlight %}
116103

117-
#### Performance Tuning
104+
The above example configures Samza to ignore checkpointed offsets for `page-view-topic` and consume from the oldest available offset during startup. You can configure this behavior to apply to all topics in the Kafka cluster by using `KafkaSystemDescriptor#withDefaultStreamOffsetDefault`.
105+
106+
118107

119-
Increasing the consumer fetch buffer thresholds may improve throughput at the expense of memory by buffering more messages. Run some performance analysis to find the optimal values.
108+
### Code walkthrough
109+
110+
In this section, we walk through a complete example.
111+
112+
#### High-level API
113+
{% highlight java %}
114+
// Define coordinates of the Kafka cluster using the KafkaSystemDescriptor
115+
1 KafkaSystemDescriptor kafkaSystemDescriptor = new KafkaSystemDescriptor("kafka")
116+
2 .withConsumerZkConnect(KAFKA_CONSUMER_ZK_CONNECT)
117+
3 .withProducerBootstrapServers(KAFKA_PRODUCER_BOOTSTRAP_SERVERS)
118+
119+
// Create an KafkaInputDescriptor for your input topic and a KafkaOutputDescriptor for the output topic
120+
4 KVSerde<String, PageView> serde = KVSerde.of(new StringSerde(), new JsonSerdeV2<>(PageView.class));
121+
5 KafkaInputDescriptor<KV<String, PageView>> inputDescriptor =
122+
6 kafkaSystemDescriptor.getInputDescriptor("page-views", serde);
123+
7 KafkaOutputDescriptor<KV<String, PageView>> outputDescriptor =
124+
8 kafkaSystemDescriptor.getOutputDescriptor("filtered-page-views", serde);
125+
126+
127+
// Obtain a message stream the input topic
128+
9 MessageStream<KV<String, PageView>> pageViews = appDescriptor.getInputStream(inputDescriptor);
129+
130+
// Obtain an output stream for the topic
131+
10 OutputStream<KV<String, PageView>> filteredPageViews = appDescriptor.getOutputStream(outputDescriptor);
132+
133+
// write results to the output topic
134+
11 pageViews
135+
12 .filter(kv -> !INVALID_USER_ID.equals(kv.value.userId))
136+
13 .sendTo(filteredPageViews);
120137

121-
{% highlight jproperties %}
122-
# Max number of messages to buffer across all Kafka input topic partitions per container. Default is 50000 messages.
123-
systems.kafka-cluster-1.samza.fetch.threshold=10000
124-
# Max buffer size by bytes. This configuration takes precedence over the above configuration if value is not -1. Default is -1.
125-
systems.kafka-cluster-1.samza.fetch.threshold.bytes=-1
126138
{% endhighlight %}
139+
140+
- Lines 1-3 create a KafkaSystemDescriptor defining the coordinates of our Kafka cluster
141+
142+
- Lines 4-6 defines a KafkaInputDescriptor for our input topic - `page-views`
143+
144+
- Lines 7-9 defines a KafkaOutputDescriptor for our output topic - `filtered-page-views`
145+
146+
- Line 9 creates a MessageStream for the input topic so that you can chain operations on it later
147+
148+
- Line 10 creates an OuputStream for the output topic
149+
150+
- Lines 11-13 define a simple pipeline that reads from the input stream and writes filtered results to the output stream

docs/learn/documentation/versioned/connectors/overview.md

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,27 @@ title: Connectors overview
2020
-->
2121

2222
Stream processing applications often read data from external sources like Kafka or HDFS. Likewise, they require processed
23-
results to be written to external system or data stores. As of the 1.0 release, Samza integrates with the following systems
24-
out-of-the-box:
23+
results to be written to external system or data stores. Samza is pluggable and designed to support a variety of [producers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemProducer.html) and [consumers](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemConsumer.html) for your data. You can
24+
integrate Samza with any streaming system by implementing the [SystemFactory](/learn/documentation/{{site.version}}/api/javadocs/org/apache/samza/system/SystemFactory.html) interface.
2525

26-
- [Apache Kafka](kafka) (consumer/producer)
27-
- [Microsoft Azure Eventhubs](eventhubs) (consumer/producer)
28-
- [Amazon AWS Kinesis Streams](kinesis) (consumer)
29-
- [Hadoop Filesystem](hdfs) (consumer/producer)
30-
- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java) (producer)
26+
The following integrations are supported out-of-the-box:
3127

32-
Instructions on how to use these connectors can be found in the corresponding subsections. Please note that the
33-
connector API is different from [Samza Table API](../api/table-api), where the data could be read from and written to
34-
data stores.
28+
Consumers:
3529

36-
Samza is pluggable and designed to support a variety of producers and consumers. You can provide your own producer or
37-
consumer by implementing the SystemFactory interface.
30+
- [Apache Kafka](kafka)
3831

39-
To associate a system with a Samza Connector, the user needs to set the following config:
32+
- [Microsoft Azure Eventhubs](eventhubs)
4033

41-
{% highlight jproperties %}
42-
systems.<system-name>.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
43-
{% endhighlight %}
34+
- [Amazon AWS Kinesis Streams](kinesis)
4435

45-
Any system specific configs, could be defined as below:
36+
- [Hadoop Filesystem](hdfs)
4637

47-
{% highlight jproperties %}
48-
systems.<system-name>.param1=value1
49-
systems.<system-name>.consumer.param2=value2
50-
systems.<system-name>.producer.param3=value3
51-
{% endhighlight %}
38+
Producers:
5239

40+
- [Apache Kafka](kafka)
41+
42+
- [Microsoft Azure Eventhubs](eventhubs)
43+
44+
- [Hadoop Filesystem](hdfs)
45+
46+
- [Elasticsearch](https://github.com/apache/samza/blob/master/samza-elasticsearch/src/main/java/org/apache/samza/system/elasticsearch/ElasticsearchSystemProducer.java)

0 commit comments

Comments
 (0)