1717
1818package org .apache .spark .streaming .kafka010
1919
20- import java .{ util => ju }
20+ import java .{ lang => jl , util => ju }
2121
2222import scala .collection .JavaConverters ._
2323
@@ -30,15 +30,16 @@ import org.apache.spark.annotation.Experimental
3030/**
3131 * :: Experimental ::
3232 * Choice of how to create and configure underlying Kafka Consumers on driver and executors.
33+ * See [[ConsumerStrategies ]] to obtain instances.
3334 * Kafka 0.10 consumers can require additional, sometimes complex, setup after object
3435 * instantiation. This interface encapsulates that process, and allows it to be checkpointed.
3536 * @tparam K type of Kafka message key
3637 * @tparam V type of Kafka message value
3738 */
3839@ Experimental
39- trait ConsumerStrategy [K , V ] {
40+ abstract class ConsumerStrategy [K , V ] {
4041 /**
41- * Kafka <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
42+ * Kafka <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
4243 * configuration parameters</a> to be used on executors. Requires "bootstrap.servers" to be set
4344 * with Kafka broker(s) specified in host1:port1,host2:port2 form.
4445 */
@@ -51,15 +52,14 @@ trait ConsumerStrategy[K, V] {
5152 * has successfully read. Will be empty on initial start, possibly non-empty on restart from
5253 * checkpoint.
5354 */
54- def onStart (currentOffsets : Map [TopicPartition , Long ]): Consumer [K , V ]
55+ def onStart (currentOffsets : ju. Map [TopicPartition , jl. Long ]): Consumer [K , V ]
5556}
5657
5758/**
58- * :: Experimental ::
5959 * Subscribe to a collection of topics.
6060 * @param topics collection of topics to subscribe
6161 * @param kafkaParams Kafka
62- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
62+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
6363 * configuration parameters</a> to be used on driver. The same params will be used on executors,
6464 * with minor automatic modifications applied.
6565 * Requires "bootstrap.servers" to be set
@@ -68,16 +68,15 @@ trait ConsumerStrategy[K, V] {
6868 * TopicPartition, the committed offset (if applicable) or kafka param
6969 * auto.offset.reset will be used.
7070 */
71- @ Experimental
72- case class Subscribe [K , V ] private (
73- topics : ju.Collection [java.lang.String ],
71+ private case class Subscribe [K , V ](
72+ topics : ju.Collection [jl.String ],
7473 kafkaParams : ju.Map [String , Object ],
75- offsets : ju.Map [TopicPartition , Long ]
74+ offsets : ju.Map [TopicPartition , jl. Long ]
7675 ) extends ConsumerStrategy [K , V ] {
7776
7877 def executorKafkaParams : ju.Map [String , Object ] = kafkaParams
7978
80- def onStart (currentOffsets : Map [TopicPartition , Long ]): Consumer [K , V ] = {
79+ def onStart (currentOffsets : ju. Map [TopicPartition , jl. Long ]): Consumer [K , V ] = {
8180 val consumer = new KafkaConsumer [K , V ](kafkaParams)
8281 consumer.subscribe(topics)
8382 if (currentOffsets.isEmpty) {
@@ -90,18 +89,52 @@ case class Subscribe[K, V] private(
9089 }
9190}
9291
92+ /**
93+ * Assign a fixed collection of TopicPartitions
94+ * @param topicPartitions collection of TopicPartitions to assign
95+ * @param kafkaParams Kafka
96+ * <a href="http://kafka.apache.org/documentation.html#newconsumerconfigs">
97+ * configuration parameters</a> to be used on driver. The same params will be used on executors,
98+ * with minor automatic modifications applied.
99+ * Requires "bootstrap.servers" to be set
100+ * with Kafka broker(s) specified in host1:port1,host2:port2 form.
101+ * @param offsets: offsets to begin at on initial startup. If no offset is given for a
102+ * TopicPartition, the committed offset (if applicable) or kafka param
103+ * auto.offset.reset will be used.
104+ */
105+ private case class Assign [K , V ](
106+ topicPartitions : ju.Collection [TopicPartition ],
107+ kafkaParams : ju.Map [String , Object ],
108+ offsets : ju.Map [TopicPartition , jl.Long ]
109+ ) extends ConsumerStrategy [K , V ] {
110+
111+ def executorKafkaParams : ju.Map [String , Object ] = kafkaParams
112+
113+ def onStart (currentOffsets : ju.Map [TopicPartition , jl.Long ]): Consumer [K , V ] = {
114+ val consumer = new KafkaConsumer [K , V ](kafkaParams)
115+ consumer.assign(topicPartitions)
116+ if (currentOffsets.isEmpty) {
117+ offsets.asScala.foreach { case (topicPartition, offset) =>
118+ consumer.seek(topicPartition, offset)
119+ }
120+ }
121+
122+ consumer
123+ }
124+ }
125+
93126/**
94127 * :: Experimental ::
95- * Companion object for creating [[ Subscribe ]] strategy
128+ * object for obtaining instances of [[ ConsumerStrategy ]]
96129 */
97130@ Experimental
98- object Subscribe {
131+ object ConsumerStrategies {
99132 /**
100133 * :: Experimental ::
101134 * Subscribe to a collection of topics.
102135 * @param topics collection of topics to subscribe
103136 * @param kafkaParams Kafka
104- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
137+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
105138 * configuration parameters</a> to be used on driver. The same params will be used on executors,
106139 * with minor automatic modifications applied.
107140 * Requires "bootstrap.servers" to be set
@@ -111,43 +144,43 @@ object Subscribe {
111144 * auto.offset.reset will be used.
112145 */
113146 @ Experimental
114- def apply [K , V ](
115- topics : Iterable [java.lang .String ],
147+ def Subscribe [K , V ](
148+ topics : Iterable [jl .String ],
116149 kafkaParams : collection.Map [String , Object ],
117- offsets : collection.Map [TopicPartition , Long ]): Subscribe [K , V ] = {
118- Subscribe [K , V ](
150+ offsets : collection.Map [TopicPartition , Long ]): ConsumerStrategy [K , V ] = {
151+ new Subscribe [K , V ](
119152 new ju.ArrayList (topics.asJavaCollection),
120153 new ju.HashMap [String , Object ](kafkaParams.asJava),
121- new ju.HashMap [TopicPartition , Long ](offsets.asJava))
154+ new ju.HashMap [TopicPartition , jl. Long ](offsets.mapValues(l => new jl. Long (l)) .asJava))
122155 }
123156
124157 /**
125158 * :: Experimental ::
126159 * Subscribe to a collection of topics.
127160 * @param topics collection of topics to subscribe
128161 * @param kafkaParams Kafka
129- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
162+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
130163 * configuration parameters</a> to be used on driver. The same params will be used on executors,
131164 * with minor automatic modifications applied.
132165 * Requires "bootstrap.servers" to be set
133166 * with Kafka broker(s) specified in host1:port1,host2:port2 form.
134167 */
135168 @ Experimental
136- def apply [K , V ](
137- topics : Iterable [java.lang .String ],
138- kafkaParams : collection.Map [String , Object ]): Subscribe [K , V ] = {
139- Subscribe [K , V ](
169+ def Subscribe [K , V ](
170+ topics : Iterable [jl .String ],
171+ kafkaParams : collection.Map [String , Object ]): ConsumerStrategy [K , V ] = {
172+ new Subscribe [K , V ](
140173 new ju.ArrayList (topics.asJavaCollection),
141174 new ju.HashMap [String , Object ](kafkaParams.asJava),
142- ju.Collections .emptyMap[TopicPartition , Long ]())
175+ ju.Collections .emptyMap[TopicPartition , jl. Long ]())
143176 }
144177
145178 /**
146179 * :: Experimental ::
147180 * Subscribe to a collection of topics.
148181 * @param topics collection of topics to subscribe
149182 * @param kafkaParams Kafka
150- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
183+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
151184 * configuration parameters</a> to be used on driver. The same params will be used on executors,
152185 * with minor automatic modifications applied.
153186 * Requires "bootstrap.servers" to be set
@@ -157,81 +190,37 @@ object Subscribe {
157190 * auto.offset.reset will be used.
158191 */
159192 @ Experimental
160- def create [K , V ](
161- topics : ju.Collection [java.lang .String ],
193+ def Subscribe [K , V ](
194+ topics : ju.Collection [jl .String ],
162195 kafkaParams : ju.Map [String , Object ],
163- offsets : ju.Map [TopicPartition , Long ]): Subscribe [K , V ] = {
164- Subscribe [K , V ](topics, kafkaParams, offsets)
196+ offsets : ju.Map [TopicPartition , jl. Long ]): ConsumerStrategy [K , V ] = {
197+ new Subscribe [K , V ](topics, kafkaParams, offsets)
165198 }
166199
167200 /**
168201 * :: Experimental ::
169202 * Subscribe to a collection of topics.
170203 * @param topics collection of topics to subscribe
171204 * @param kafkaParams Kafka
172- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
205+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
173206 * configuration parameters</a> to be used on driver. The same params will be used on executors,
174207 * with minor automatic modifications applied.
175208 * Requires "bootstrap.servers" to be set
176209 * with Kafka broker(s) specified in host1:port1,host2:port2 form.
177210 */
178211 @ Experimental
179- def create [K , V ](
180- topics : ju.Collection [java.lang.String ],
181- kafkaParams : ju.Map [String , Object ]): Subscribe [K , V ] = {
182- Subscribe [K , V ](topics, kafkaParams, ju.Collections .emptyMap[TopicPartition , Long ]())
183- }
184-
185- }
186-
187- /**
188- * :: Experimental ::
189- * Assign a fixed collection of TopicPartitions
190- * @param topicPartitions collection of TopicPartitions to assign
191- * @param kafkaParams Kafka
192- * <a href="http://kafka.apache.org/documentation.htmll#newconsumerconfigs">
193- * configuration parameters</a> to be used on driver. The same params will be used on executors,
194- * with minor automatic modifications applied.
195- * Requires "bootstrap.servers" to be set
196- * with Kafka broker(s) specified in host1:port1,host2:port2 form.
197- * @param offsets: offsets to begin at on initial startup. If no offset is given for a
198- * TopicPartition, the committed offset (if applicable) or kafka param
199- * auto.offset.reset will be used.
200- */
201- @ Experimental
202- case class Assign [K , V ] private (
203- topicPartitions : ju.Collection [TopicPartition ],
204- kafkaParams : ju.Map [String , Object ],
205- offsets : ju.Map [TopicPartition , Long ]
206- ) extends ConsumerStrategy [K , V ] {
207-
208- def executorKafkaParams : ju.Map [String , Object ] = kafkaParams
209-
210- def onStart (currentOffsets : Map [TopicPartition , Long ]): Consumer [K , V ] = {
211- val consumer = new KafkaConsumer [K , V ](kafkaParams)
212- consumer.assign(topicPartitions)
213- if (currentOffsets.isEmpty) {
214- offsets.asScala.foreach { case (topicPartition, offset) =>
215- consumer.seek(topicPartition, offset)
216- }
217- }
218-
219- consumer
212+ def Subscribe [K , V ](
213+ topics : ju.Collection [jl.String ],
214+ kafkaParams : ju.Map [String , Object ]): ConsumerStrategy [K , V ] = {
215+ new Subscribe [K , V ](topics, kafkaParams, ju.Collections .emptyMap[TopicPartition , jl.Long ]())
220216 }
221- }
222217
223- /**
224- * :: Experimental ::
225- * Companion object for creating [[Assign ]] strategy
226- */
227- @ Experimental
228- object Assign {
229218 /**
230219 * :: Experimental ::
231220 * Assign a fixed collection of TopicPartitions
232221 * @param topicPartitions collection of TopicPartitions to assign
233222 * @param kafkaParams Kafka
234- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
223+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
235224 * configuration parameters</a> to be used on driver. The same params will be used on executors,
236225 * with minor automatic modifications applied.
237226 * Requires "bootstrap.servers" to be set
@@ -241,43 +230,43 @@ object Assign {
241230 * auto.offset.reset will be used.
242231 */
243232 @ Experimental
244- def apply [K , V ](
233+ def Assign [K , V ](
245234 topicPartitions : Iterable [TopicPartition ],
246235 kafkaParams : collection.Map [String , Object ],
247- offsets : collection.Map [TopicPartition , Long ]): Assign [K , V ] = {
248- Assign [K , V ](
236+ offsets : collection.Map [TopicPartition , Long ]): ConsumerStrategy [K , V ] = {
237+ new Assign [K , V ](
249238 new ju.ArrayList (topicPartitions.asJavaCollection),
250239 new ju.HashMap [String , Object ](kafkaParams.asJava),
251- new ju.HashMap [TopicPartition , Long ](offsets.asJava))
240+ new ju.HashMap [TopicPartition , jl. Long ](offsets.mapValues(l => new jl. Long (l)) .asJava))
252241 }
253242
254243 /**
255244 * :: Experimental ::
256245 * Assign a fixed collection of TopicPartitions
257246 * @param topicPartitions collection of TopicPartitions to assign
258247 * @param kafkaParams Kafka
259- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
248+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
260249 * configuration parameters</a> to be used on driver. The same params will be used on executors,
261250 * with minor automatic modifications applied.
262251 * Requires "bootstrap.servers" to be set
263252 * with Kafka broker(s) specified in host1:port1,host2:port2 form.
264253 */
265254 @ Experimental
266- def apply [K , V ](
255+ def Assign [K , V ](
267256 topicPartitions : Iterable [TopicPartition ],
268- kafkaParams : collection.Map [String , Object ]): Assign [K , V ] = {
269- Assign [K , V ](
257+ kafkaParams : collection.Map [String , Object ]): ConsumerStrategy [K , V ] = {
258+ new Assign [K , V ](
270259 new ju.ArrayList (topicPartitions.asJavaCollection),
271260 new ju.HashMap [String , Object ](kafkaParams.asJava),
272- ju.Collections .emptyMap[TopicPartition , Long ]())
261+ ju.Collections .emptyMap[TopicPartition , jl. Long ]())
273262 }
274263
275264 /**
276265 * :: Experimental ::
277266 * Assign a fixed collection of TopicPartitions
278267 * @param topicPartitions collection of TopicPartitions to assign
279268 * @param kafkaParams Kafka
280- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
269+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
281270 * configuration parameters</a> to be used on driver. The same params will be used on executors,
282271 * with minor automatic modifications applied.
283272 * Requires "bootstrap.servers" to be set
@@ -287,28 +276,32 @@ object Assign {
287276 * auto.offset.reset will be used.
288277 */
289278 @ Experimental
290- def create [K , V ](
279+ def Assign [K , V ](
291280 topicPartitions : ju.Collection [TopicPartition ],
292281 kafkaParams : ju.Map [String , Object ],
293- offsets : ju.Map [TopicPartition , Long ]): Assign [K , V ] = {
294- Assign [K , V ](topicPartitions, kafkaParams, offsets)
282+ offsets : ju.Map [TopicPartition , jl. Long ]): ConsumerStrategy [K , V ] = {
283+ new Assign [K , V ](topicPartitions, kafkaParams, offsets)
295284 }
296285
297286 /**
298287 * :: Experimental ::
299288 * Assign a fixed collection of TopicPartitions
300289 * @param topicPartitions collection of TopicPartitions to assign
301290 * @param kafkaParams Kafka
302- * <a href="http://kafka.apache.org/documentation.htmll #newconsumerconfigs">
291+ * <a href="http://kafka.apache.org/documentation.html #newconsumerconfigs">
303292 * configuration parameters</a> to be used on driver. The same params will be used on executors,
304293 * with minor automatic modifications applied.
305294 * Requires "bootstrap.servers" to be set
306295 * with Kafka broker(s) specified in host1:port1,host2:port2 form.
307296 */
308297 @ Experimental
309- def create [K , V ](
298+ def Assign [K , V ](
310299 topicPartitions : ju.Collection [TopicPartition ],
311- kafkaParams : ju.Map [String , Object ]): Assign [K , V ] = {
312- Assign [K , V ](topicPartitions, kafkaParams, ju.Collections .emptyMap[TopicPartition , Long ]())
300+ kafkaParams : ju.Map [String , Object ]): ConsumerStrategy [K , V ] = {
301+ new Assign [K , V ](
302+ topicPartitions,
303+ kafkaParams,
304+ ju.Collections .emptyMap[TopicPartition , jl.Long ]())
313305 }
306+
314307}
0 commit comments