Skip to content

Commit 34234d4

Browse files
author
Davies Liu
committed
Merge branch 'master' of github.com:apache/spark into python_udf
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/Dsl.scala
2 parents 440f769 + b0c0021 commit 34234d4

File tree

22 files changed

+1491
-35
lines changed

22 files changed

+1491
-35
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717

1818
package org.apache.spark.api.python
1919

20-
import java.io.{File, InputStream, IOException, OutputStream}
20+
import java.io.{File}
21+
import java.util.{List => JList}
2122

23+
import scala.collection.JavaConversions._
2224
import scala.collection.mutable.ArrayBuffer
2325

2426
import org.apache.spark.SparkContext
@@ -44,4 +46,11 @@ private[spark] object PythonUtils {
4446
def generateRDDWithNull(sc: JavaSparkContext): JavaRDD[String] = {
4547
sc.parallelize(List("a", null, "b"))
4648
}
49+
50+
/**
51+
* Convert list of T into seq of T (for calling API with varargs)
52+
*/
53+
def toSeq[T](cols: JList[T]): Seq[T] = {
54+
cols.toList.toSeq
55+
}
4756
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,7 +279,7 @@ private[spark] object Utils extends Logging {
279279
maxAttempts + " attempts!")
280280
}
281281
try {
282-
dir = new File(root, "spark-" + UUID.randomUUID.toString)
282+
dir = new File(root, namePrefix + "-" + UUID.randomUUID.toString)
283283
if (dir.exists() || !dir.mkdirs()) {
284284
dir = null
285285
} else {

external/kafka/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
<dependency>
4545
<groupId>org.apache.kafka</groupId>
4646
<artifactId>kafka_${scala.binary.version}</artifactId>
47-
<version>0.8.0</version>
47+
<version>0.8.1.1</version>
4848
<exclusions>
4949
<exclusion>
5050
<groupId>com.sun.jmx</groupId>
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka
19+
20+
21+
import scala.annotation.tailrec
22+
import scala.collection.mutable
23+
import scala.reflect.{classTag, ClassTag}
24+
25+
import kafka.common.TopicAndPartition
26+
import kafka.message.MessageAndMetadata
27+
import kafka.serializer.Decoder
28+
29+
import org.apache.spark.{Logging, SparkException}
30+
import org.apache.spark.rdd.RDD
31+
import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset
32+
import org.apache.spark.streaming.{StreamingContext, Time}
33+
import org.apache.spark.streaming.dstream._
34+
35+
/**
36+
* A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
37+
* each given Kafka topic/partition corresponds to an RDD partition.
38+
* The spark configuration spark.streaming.kafka.maxRatePerPartition gives the maximum number
39+
* of messages
40+
* per second that each '''partition''' will accept.
41+
* Starting offsets are specified in advance,
42+
* and this DStream is not responsible for committing offsets,
43+
* so that you can control exactly-once semantics.
44+
* For an easy interface to Kafka-managed offsets,
45+
* see {@link org.apache.spark.streaming.kafka.KafkaCluster}
46+
* @param kafkaParams Kafka <a href="http://kafka.apache.org/documentation.html#configuration">
47+
* configuration parameters</a>.
48+
* Requires "metadata.broker.list" or "bootstrap.servers" to be set with Kafka broker(s),
49+
* NOT zookeeper servers, specified in host1:port1,host2:port2 form.
50+
* @param fromOffsets per-topic/partition Kafka offsets defining the (inclusive)
51+
* starting point of the stream
52+
* @param messageHandler function for translating each message into the desired type
53+
* @param maxRetries maximum number of times in a row to retry getting leaders' offsets
54+
*/
55+
private[streaming]
56+
class DirectKafkaInputDStream[
57+
K: ClassTag,
58+
V: ClassTag,
59+
U <: Decoder[_]: ClassTag,
60+
T <: Decoder[_]: ClassTag,
61+
R: ClassTag](
62+
@transient ssc_ : StreamingContext,
63+
val kafkaParams: Map[String, String],
64+
val fromOffsets: Map[TopicAndPartition, Long],
65+
messageHandler: MessageAndMetadata[K, V] => R
66+
) extends InputDStream[R](ssc_) with Logging {
67+
val maxRetries = context.sparkContext.getConf.getInt(
68+
"spark.streaming.kafka.maxRetries", 1)
69+
70+
protected[streaming] override val checkpointData =
71+
new DirectKafkaInputDStreamCheckpointData
72+
73+
protected val kc = new KafkaCluster(kafkaParams)
74+
75+
protected val maxMessagesPerPartition: Option[Long] = {
76+
val ratePerSec = context.sparkContext.getConf.getInt(
77+
"spark.streaming.kafka.maxRatePerPartition", 0)
78+
if (ratePerSec > 0) {
79+
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
80+
Some((secsPerBatch * ratePerSec).toLong)
81+
} else {
82+
None
83+
}
84+
}
85+
86+
protected var currentOffsets = fromOffsets
87+
88+
@tailrec
89+
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
90+
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
91+
// Either.fold would confuse @tailrec, do it manually
92+
if (o.isLeft) {
93+
val err = o.left.get.toString
94+
if (retries <= 0) {
95+
throw new SparkException(err)
96+
} else {
97+
log.error(err)
98+
Thread.sleep(kc.config.refreshLeaderBackoffMs)
99+
latestLeaderOffsets(retries - 1)
100+
}
101+
} else {
102+
o.right.get
103+
}
104+
}
105+
106+
// limits the maximum number of messages per partition
107+
protected def clamp(
108+
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
109+
maxMessagesPerPartition.map { mmp =>
110+
leaderOffsets.map { case (tp, lo) =>
111+
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
112+
}
113+
}.getOrElse(leaderOffsets)
114+
}
115+
116+
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
117+
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
118+
val rdd = KafkaRDD[K, V, U, T, R](
119+
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
120+
121+
currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
122+
Some(rdd)
123+
}
124+
125+
override def start(): Unit = {
126+
}
127+
128+
def stop(): Unit = {
129+
}
130+
131+
private[streaming]
132+
class DirectKafkaInputDStreamCheckpointData extends DStreamCheckpointData(this) {
133+
def batchForTime = data.asInstanceOf[mutable.HashMap[
134+
Time, Array[OffsetRange.OffsetRangeTuple]]]
135+
136+
override def update(time: Time) {
137+
batchForTime.clear()
138+
generatedRDDs.foreach { kv =>
139+
val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, R]].offsetRanges.map(_.toTuple).toArray
140+
batchForTime += kv._1 -> a
141+
}
142+
}
143+
144+
override def cleanup(time: Time) { }
145+
146+
override def restore() {
147+
// this is assuming that the topics don't change during execution, which is true currently
148+
val topics = fromOffsets.keySet
149+
val leaders = kc.findLeaders(topics).fold(
150+
errs => throw new SparkException(errs.mkString("\n")),
151+
ok => ok
152+
)
153+
154+
batchForTime.toSeq.sortBy(_._1)(Time.ordering).foreach { case (t, b) =>
155+
logInfo(s"Restoring KafkaRDD for time $t ${b.mkString("[", ", ", "]")}")
156+
generatedRDDs += t -> new KafkaRDD[K, V, U, T, R](
157+
context.sparkContext, kafkaParams, b.map(OffsetRange(_)), leaders, messageHandler)
158+
}
159+
}
160+
}
161+
162+
}

0 commit comments

Comments
 (0)