diff --git a/docs/structured-streaming-kafka-integration.md b/docs/structured-streaming-kafka-integration.md index a549ce2a6a05f..7040f8da2c614 100644 --- a/docs/structured-streaming-kafka-integration.md +++ b/docs/structured-streaming-kafka-integration.md @@ -66,8 +66,8 @@ Dataset df = spark .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .load(); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); // Subscribe to multiple topics Dataset df = spark @@ -75,8 +75,8 @@ Dataset df = spark .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribe", "topic1,topic2") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .load(); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); // Subscribe to a pattern Dataset df = spark @@ -84,8 +84,8 @@ Dataset df = spark .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("subscribePattern", "topic.*") - .load() -df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .load(); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); {% endhighlight %} @@ -479,7 +479,7 @@ StreamingQuery ds = df .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") - .start() + .start(); // Write key-value data from a DataFrame to Kafka using a topic specified in the data StreamingQuery ds = df @@ -487,7 +487,7 @@ StreamingQuery ds = df .writeStream() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .start() + .start(); {% endhighlight %} @@ -547,14 +547,14 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "topic1") - .save() + .save(); // Write key-value data from a DataFrame to Kafka using a topic specified in the data df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)") .write() .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") - .save() + .save(); {% endhighlight %} @@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark- See [Application Submission Guide](submitting-applications.html) for more details about submitting applications with external dependencies. + +## Security + +Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed +description about these possibilities, see [Kafka security docs](http://kafka.apache.org/documentation.html#security). + +It's worth noting that security is optional and turned off by default. + +Spark supports the following ways to authenticate against Kafka cluster: +- **Delegation token (introduced in Kafka broker 1.1.0)** +- **JAAS login configuration** + +### Delegation token + +This way the application can be configured via Spark parameters and may not need JAAS login +configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information +about delegation tokens, see [Kafka delegation token docs](http://kafka.apache.org/documentation/#security_delegation_token). + +The process is initiated by Spark's Kafka delegation token provider. When `spark.kafka.bootstrap.servers`, +Spark considers the following log in options, in order of preference: +- **JAAS login configuration** +- **Keytab file**, such as, + + ./bin/spark-submit \ + --keytab \ + --principal \ + --conf spark.kafka.bootstrap.servers= \ + ... + +- **Kerberos credential cache**, such as, + + ./bin/spark-submit \ + --conf spark.kafka.bootstrap.servers= \ + ... + +The Kafka delegation token provider can be turned off by setting `spark.security.credentials.kafka.enabled` to `false` (default: `true`). + +Spark can be configured to use the following authentication protocols to obtain token (it must match with +Kafka broker configuration): +- **SASL SSL (default)** +- **SSL** +- **SASL PLAINTEXT (for testing)** + +After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly. +Delegation token uses `SCRAM` login module for authentication and because of that the appropriate +`sasl.mechanism` has to be configured on source/sink (it must match with Kafka broker configuration): + +
+
+{% highlight scala %} + +// Setting on Kafka Source for Streaming Queries +val df = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("subscribe", "topic1") + .load() +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +// Setting on Kafka Source for Batch Queries +val df = spark + .read + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("subscribe", "topic1") + .load() +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + +// Setting on Kafka Sink for Streaming Queries +val ds = df + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("topic", "topic1") + .start() + +// Setting on Kafka Sink for Batch Queries +val ds = df + .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)") + .write + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .save() + +{% endhighlight %} +
+
+{% highlight java %} + +// Setting on Kafka Source for Streaming Queries +Dataset df = spark + .readStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("subscribe", "topic1") + .load(); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); + +// Setting on Kafka Source for Batch Queries +Dataset df = spark + .read() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("subscribe", "topic1") + .load(); +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)"); + +// Setting on Kafka Sink for Streaming Queries +StreamingQuery ds = df + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .writeStream() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("topic", "topic1") + .start(); + +// Setting on Kafka Sink for Batch Queries +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .write() + .format("kafka") + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") + .option("topic", "topic1") + .save(); + +{% endhighlight %} +
+
+{% highlight python %} + +// Setting on Kafka Source for Streaming Queries +df = spark \ + .readStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ + .option("subscribe", "topic1") \ + .load() +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +// Setting on Kafka Source for Batch Queries +df = spark \ + .read \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ + .option("subscribe", "topic1") \ + .load() +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + +// Setting on Kafka Sink for Streaming Queries +ds = df \ + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .writeStream \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ + .option("topic", "topic1") \ + .start() + +// Setting on Kafka Sink for Batch Queries +df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ + .write \ + .format("kafka") \ + .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ + .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \ + .option("topic", "topic1") \ + .save() + +{% endhighlight %} +
+
+ +When delegation token is available on an executor it can be overridden with JAAS login configuration. + +### JAAS login configuration + +JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster. +This provides the possibility to apply any custom authentication logic with a higher cost to maintain. +This can be done several ways. One possibility is to provide additional JVM parameters, such as, + + ./bin/spark-submit \ + --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \ + --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \ + ...