Skip to content

Commit 71859a7

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark into SPARK-1930
2 parents e3c531d + 8d85359 commit 71859a7

File tree

137 files changed

+3400
-1842
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

137 files changed

+3400
-1842
lines changed

.gitignore

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
sbt/*.jar
88
.settings
99
.cache
10-
.mima-excludes
10+
.generated-mima-excludes
1111
/build/
1212
work/
1313
out/

.rat-excludes

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ target
33
.project
44
.classpath
55
.mima-excludes
6+
.generated-mima-excludes
67
.rat-excludes
78
.*md
89
derby.log

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
3838
sc.stop()
3939
sc = null
4040
}
41-
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
42-
System.clearProperty("spark.driver.port")
4341
}
4442

4543
test("halting by voting") {

core/pom.xml

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
2323
<artifactId>spark-parent</artifactId>
24-
<version>1.0.0-SNAPSHOT</version>
24+
<version>1.1.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
2727

@@ -258,35 +258,6 @@
258258
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>
259259
<testOutputDirectory>target/scala-${scala.binary.version}/test-classes</testOutputDirectory>
260260
<plugins>
261-
<plugin>
262-
<groupId>org.apache.maven.plugins</groupId>
263-
<artifactId>maven-antrun-plugin</artifactId>
264-
<executions>
265-
<execution>
266-
<phase>test</phase>
267-
<goals>
268-
<goal>run</goal>
269-
</goals>
270-
<configuration>
271-
<exportAntProperties>true</exportAntProperties>
272-
<target>
273-
<property name="spark.classpath" refid="maven.test.classpath" />
274-
<property environment="env" />
275-
<fail message="Please set the SCALA_HOME (or SCALA_LIBRARY_PATH if scala is on the path) environment variables and retry.">
276-
<condition>
277-
<not>
278-
<or>
279-
<isset property="env.SCALA_HOME" />
280-
<isset property="env.SCALA_LIBRARY_PATH" />
281-
</or>
282-
</not>
283-
</condition>
284-
</fail>
285-
</target>
286-
</configuration>
287-
</execution>
288-
</executions>
289-
</plugin>
290261
<plugin>
291262
<groupId>org.scalatest</groupId>
292263
<artifactId>scalatest-maven-plugin</artifactId>

core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala

Lines changed: 30 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -672,38 +672,47 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
672672

673673
/**
674674
* Return approximate number of distinct values for each key in this RDD.
675-
* The accuracy of approximation can be controlled through the relative standard deviation
676-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
677-
* more accurate counts but increase the memory footprint and vise versa. Uses the provided
678-
* Partitioner to partition the output RDD.
675+
*
676+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
677+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
678+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
679+
*
680+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
681+
* It must be greater than 0.000017.
682+
* @param partitioner partitioner of the resulting RDD.
679683
*/
680-
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaRDD[(K, Long)] = {
681-
rdd.countApproxDistinctByKey(relativeSD, partitioner)
684+
def countApproxDistinctByKey(relativeSD: Double, partitioner: Partitioner): JavaPairRDD[K, Long] =
685+
{
686+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, partitioner))
682687
}
683688

684689
/**
685-
* Return approximate number of distinct values for each key this RDD.
686-
* The accuracy of approximation can be controlled through the relative standard deviation
687-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
688-
* more accurate counts but increase the memory footprint and vise versa. The default value of
689-
* relativeSD is 0.05. Hash-partitions the output RDD using the existing partitioner/parallelism
690-
* level.
690+
* Return approximate number of distinct values for each key in this RDD.
691+
*
692+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
693+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
694+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
695+
*
696+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
697+
* It must be greater than 0.000017.
698+
* @param numPartitions number of partitions of the resulting RDD.
691699
*/
692-
def countApproxDistinctByKey(relativeSD: Double = 0.05): JavaRDD[(K, Long)] = {
693-
rdd.countApproxDistinctByKey(relativeSD)
700+
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaPairRDD[K, Long] = {
701+
fromRDD(rdd.countApproxDistinctByKey(relativeSD, numPartitions))
694702
}
695703

696-
697704
/**
698705
* Return approximate number of distinct values for each key in this RDD.
699-
* The accuracy of approximation can be controlled through the relative standard deviation
700-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
701-
* more accurate counts but increase the memory footprint and vise versa. HashPartitions the
702-
* output RDD into numPartitions.
703706
*
707+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
708+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
709+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
710+
*
711+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
712+
* It must be greater than 0.000017.
704713
*/
705-
def countApproxDistinctByKey(relativeSD: Double, numPartitions: Int): JavaRDD[(K, Long)] = {
706-
rdd.countApproxDistinctByKey(relativeSD, numPartitions)
714+
def countApproxDistinctByKey(relativeSD: Double): JavaPairRDD[K, Long] = {
715+
fromRDD(rdd.countApproxDistinctByKey(relativeSD))
707716
}
708717

709718
/** Assign a name to this RDD */

core/src/main/scala/org/apache/spark/api/java/JavaRDD.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,28 @@ class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T])
108108
def sample(withReplacement: Boolean, fraction: Double, seed: Long): JavaRDD[T] =
109109
wrapRDD(rdd.sample(withReplacement, fraction, seed))
110110

111+
112+
/**
113+
* Randomly splits this RDD with the provided weights.
114+
*
115+
* @param weights weights for splits, will be normalized if they don't sum to 1
116+
*
117+
* @return split RDDs in an array
118+
*/
119+
def randomSplit(weights: Array[Double]): Array[JavaRDD[T]] =
120+
randomSplit(weights, Utils.random.nextLong)
121+
122+
/**
123+
* Randomly splits this RDD with the provided weights.
124+
*
125+
* @param weights weights for splits, will be normalized if they don't sum to 1
126+
* @param seed random seed
127+
*
128+
* @return split RDDs in an array
129+
*/
130+
def randomSplit(weights: Array[Double], seed: Long): Array[JavaRDD[T]] =
131+
rdd.randomSplit(weights, seed).map(wrapRDD)
132+
111133
/**
112134
* Return the union of this RDD and another one. Any identical elements will appear multiple
113135
* times (use `.distinct()` to eliminate them).

core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -560,12 +560,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
560560
/**
561561
* Return approximate number of distinct elements in the RDD.
562562
*
563-
* The accuracy of approximation can be controlled through the relative standard deviation
564-
* (relativeSD) parameter, which also controls the amount of memory used. Lower values result in
565-
* more accurate counts but increase the memory footprint and vise versa. The default value of
566-
* relativeSD is 0.05.
563+
* The algorithm used is based on streamlib's implementation of "HyperLogLog in Practice:
564+
* Algorithmic Engineering of a State of The Art Cardinality Estimation Algorithm", available
565+
* <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
566+
*
567+
* @param relativeSD Relative accuracy. Smaller values create counters that require more space.
568+
* It must be greater than 0.000017.
567569
*/
568-
def countApproxDistinct(relativeSD: Double = 0.05): Long = rdd.countApproxDistinct(relativeSD)
570+
def countApproxDistinct(relativeSD: Double): Long = rdd.countApproxDistinct(relativeSD)
569571

570572
def name(): String = rdd.name
571573

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,26 @@ private object SpecialLengths {
269269
private[spark] object PythonRDD {
270270
val UTF8 = Charset.forName("UTF-8")
271271

272+
/**
273+
* Adapter for calling SparkContext#runJob from Python.
274+
*
275+
* This method will return an iterator of an array that contains all elements in the RDD
276+
* (effectively a collect()), but allows you to run on a certain subset of partitions,
277+
* or to enable local execution.
278+
*/
279+
def runJob(
280+
sc: SparkContext,
281+
rdd: JavaRDD[Array[Byte]],
282+
partitions: JArrayList[Int],
283+
allowLocal: Boolean): Iterator[Array[Byte]] = {
284+
type ByteArray = Array[Byte]
285+
type UnrolledPartition = Array[ByteArray]
286+
val allPartitions: Array[UnrolledPartition] =
287+
sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions, allowLocal)
288+
val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*)
289+
flattenedPartition.iterator
290+
}
291+
272292
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
273293
JavaRDD[Array[Byte]] = {
274294
val file = new DataInputStream(new FileInputStream(filename))

0 commit comments

Comments
 (0)