@@ -66,26 +66,26 @@ Dataset<Row> df = spark
6666 .format("kafka")
6767 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
6868 .option("subscribe", "topic1")
69- .load()
70- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
69+ .load();
70+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
7171
7272// Subscribe to multiple topics
7373Dataset<Row > df = spark
7474 .readStream()
7575 .format("kafka")
7676 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
7777 .option("subscribe", "topic1,topic2")
78- .load()
79- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
78+ .load();
79+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
8080
8181// Subscribe to a pattern
8282Dataset<Row > df = spark
8383 .readStream()
8484 .format("kafka")
8585 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
8686 .option("subscribePattern", "topic.* ")
87- .load()
88- df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
87+ .load();
88+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
8989
9090{% endhighlight %}
9191</div >
@@ -479,15 +479,15 @@ StreamingQuery ds = df
479479 .format("kafka")
480480 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
481481 .option("topic", "topic1")
482- .start()
482+ .start();
483483
484484// Write key-value data from a DataFrame to Kafka using a topic specified in the data
485485StreamingQuery ds = df
486486 .selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
487487 .writeStream()
488488 .format("kafka")
489489 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
490- .start()
490+ .start();
491491
492492{% endhighlight %}
493493</div >
@@ -547,14 +547,14 @@ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
547547 .format("kafka")
548548 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
549549 .option("topic", "topic1")
550- .save()
550+ .save();
551551
552552// Write key-value data from a DataFrame to Kafka using a topic specified in the data
553553df.selectExpr("topic", "CAST(key AS STRING)", "CAST(value AS STRING)")
554554 .write()
555555 .format("kafka")
556556 .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
557- .save()
557+ .save();
558558
559559{% endhighlight %}
560560</div >
@@ -624,3 +624,199 @@ For experimenting on `spark-shell`, you can also use `--packages` to add `spark-
624624
625625See [ Application Submission Guide] ( submitting-applications.html ) for more details about submitting
626626applications with external dependencies.
627+
628+ ## Security
629+
630+ Kafka 0.9.0.0 introduced several features that increases security in a cluster. For detailed
631+ description about these possibilities, see [ Kafka security docs] ( http://kafka.apache.org/documentation.html#security ) .
632+
633+ It's worth noting that security is optional and turned off by default.
634+
635+ Spark supports the following ways to authenticate against Kafka cluster:
636+ - ** Delegation token (introduced in Kafka broker 1.1.0)**
637+ - ** JAAS login configuration**
638+
639+ ### Delegation token
640+
641+ This way the application can be configured via Spark parameters and may not need JAAS login
642+ configuration (Spark can use Kafka's dynamic JAAS configuration feature). For further information
643+ about delegation tokens, see [ Kafka delegation token docs] ( http://kafka.apache.org/documentation/#security_delegation_token ) .
644+
645+ The process is initiated by Spark's Kafka delegation token provider. When ` spark.kafka.bootstrap.servers ` ,
646+ Spark considers the following log in options, in order of preference:
647+ - ** JAAS login configuration**
648+ - ** Keytab file** , such as,
649+
650+ ./bin/spark-submit \
651+ --keytab <KEYTAB_FILE> \
652+ --principal <PRINCIPAL> \
653+ --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
654+ ...
655+
656+ - ** Kerberos credential cache** , such as,
657+
658+ ./bin/spark-submit \
659+ --conf spark.kafka.bootstrap.servers=<KAFKA_SERVERS> \
660+ ...
661+
662+ The Kafka delegation token provider can be turned off by setting ` spark.security.credentials.kafka.enabled ` to ` false ` (default: ` true ` ).
663+
664+ Spark can be configured to use the following authentication protocols to obtain token (it must match with
665+ Kafka broker configuration):
666+ - ** SASL SSL (default)**
667+ - ** SSL**
668+ - ** SASL PLAINTEXT (for testing)**
669+
670+ After obtaining delegation token successfully, Spark distributes it across nodes and renews it accordingly.
671+ Delegation token uses ` SCRAM ` login module for authentication and because of that the appropriate
672+ ` sasl.mechanism ` has to be configured on source/sink (it must match with Kafka broker configuration):
673+
674+ <div class =" codetabs " >
675+ <div data-lang =" scala " markdown =" 1 " >
676+ {% highlight scala %}
677+
678+ // Setting on Kafka Source for Streaming Queries
679+ val df = spark
680+ .readStream
681+ .format("kafka")
682+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
683+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
684+ .option("subscribe", "topic1")
685+ .load()
686+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
687+ .as[ (String, String)]
688+
689+ // Setting on Kafka Source for Batch Queries
690+ val df = spark
691+ .read
692+ .format("kafka")
693+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
694+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
695+ .option("subscribe", "topic1")
696+ .load()
697+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
698+ .as[ (String, String)]
699+
700+ // Setting on Kafka Sink for Streaming Queries
701+ val ds = df
702+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
703+ .writeStream
704+ .format("kafka")
705+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
706+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
707+ .option("topic", "topic1")
708+ .start()
709+
710+ // Setting on Kafka Sink for Batch Queries
711+ val ds = df
712+ .selectExpr("topic1", "CAST(key AS STRING)", "CAST(value AS STRING)")
713+ .write
714+ .format("kafka")
715+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
716+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
717+ .save()
718+
719+ {% endhighlight %}
720+ </div >
721+ <div data-lang =" java " markdown =" 1 " >
722+ {% highlight java %}
723+
724+ // Setting on Kafka Source for Streaming Queries
725+ Dataset<Row > df = spark
726+ .readStream()
727+ .format("kafka")
728+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
729+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
730+ .option("subscribe", "topic1")
731+ .load();
732+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
733+
734+ // Setting on Kafka Source for Batch Queries
735+ Dataset<Row > df = spark
736+ .read()
737+ .format("kafka")
738+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
739+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
740+ .option("subscribe", "topic1")
741+ .load();
742+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)");
743+
744+ // Setting on Kafka Sink for Streaming Queries
745+ StreamingQuery ds = df
746+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
747+ .writeStream()
748+ .format("kafka")
749+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
750+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
751+ .option("topic", "topic1")
752+ .start();
753+
754+ // Setting on Kafka Sink for Batch Queries
755+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
756+ .write()
757+ .format("kafka")
758+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ")
759+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512")
760+ .option("topic", "topic1")
761+ .save();
762+
763+ {% endhighlight %}
764+ </div >
765+ <div data-lang =" python " markdown =" 1 " >
766+ {% highlight python %}
767+
768+ // Setting on Kafka Source for Streaming Queries
769+ df = spark \
770+ .readStream \
771+ .format("kafka") \
772+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
773+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
774+ .option("subscribe", "topic1") \
775+ .load()
776+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
777+
778+ // Setting on Kafka Source for Batch Queries
779+ df = spark \
780+ .read \
781+ .format("kafka") \
782+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
783+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
784+ .option("subscribe", "topic1") \
785+ .load()
786+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
787+
788+ // Setting on Kafka Sink for Streaming Queries
789+ ds = df \
790+ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
791+ .writeStream \
792+ .format("kafka") \
793+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
794+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
795+ .option("topic", "topic1") \
796+ .start()
797+
798+ // Setting on Kafka Sink for Batch Queries
799+ df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
800+ .write \
801+ .format("kafka") \
802+ .option("kafka.bootstrap.servers", "host1: port1 ,host2: port2 ") \
803+ .option("kafka.sasl.mechanism", "SCRAM-SHA-512") \
804+ .option("topic", "topic1") \
805+ .save()
806+
807+ {% endhighlight %}
808+ </div >
809+ </div >
810+
811+ When delegation token is available on an executor it can be overridden with JAAS login configuration.
812+
813+ ### JAAS login configuration
814+
815+ JAAS login configuration must placed on all nodes where Spark tries to access Kafka cluster.
816+ This provides the possibility to apply any custom authentication logic with a higher cost to maintain.
817+ This can be done several ways. One possibility is to provide additional JVM parameters, such as,
818+
819+ ./bin/spark-submit \
820+ --driver-java-options "-Djava.security.auth.login.config=/path/to/custom_jaas.conf" \
821+ --conf spark.executor.extraJavaOptions=-Djava.security.auth.login.config=/path/to/custom_jaas.conf \
822+ ...
0 commit comments