Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import com.google.common.base.Optional
import org.apache.hadoop.io.compress.CompressionCodec

import org.apache.spark._
import org.apache.spark.annotation.Since
import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
Expand Down Expand Up @@ -62,6 +63,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/** Set of partitions in this RDD. */
def partitions: JList[Partition] = rdd.partitions.toSeq.asJava

/** Return the number of partitions in this RDD. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mind adding a @since tag here to say that this is new in Spark 1.6?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not at all. Should I use @since or @Since, and also add it to RDD.scala? (I could not find any occurrence of this tag in spark-core).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, I added the @Since tags and updated this PR.

@Since("1.6.0")
def getNumPartitions: Int = rdd.getNumPartitions

/** The partitioner of this RDD. */
def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner)

Expand Down
8 changes: 7 additions & 1 deletion core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.hadoop.mapred.TextOutputFormat

import org.apache.spark._
import org.apache.spark.Partitioner._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.annotation.{Since, DeveloperApi}
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
Expand Down Expand Up @@ -242,6 +242,12 @@ abstract class RDD[T: ClassTag](
}
}

/**
* Returns the number of partitions of this RDD.
*/
@Since("1.6.0")
final def getNumPartitions: Int = partitions.length

/**
* Get the preferred locations of a partition, taking into account whether the
* RDD is checkpointed.
Expand Down
13 changes: 13 additions & 0 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -973,6 +973,19 @@ public Iterator<Integer> call(Integer index, Iterator<Integer> iter) {
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
}

@Test
public void getNumPartitions(){
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
JavaDoubleRDD rdd2 = sc.parallelizeDoubles(Arrays.asList(1.0, 2.0, 3.0, 4.0), 2);
JavaPairRDD<String, Integer> rdd3 = sc.parallelizePairs(Arrays.asList(
new Tuple2<>("a", 1),
new Tuple2<>("aa", 2),
new Tuple2<>("aaa", 3)
), 2);
Assert.assertEquals(3, rdd1.getNumPartitions());
Assert.assertEquals(2, rdd2.getNumPartitions());
Assert.assertEquals(2, rdd3.getNumPartitions());
}

@Test
public void repartition() {
Expand Down
1 change: 1 addition & 0 deletions core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {

test("basic operations") {
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.getNumPartitions === 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.toLocalIterator.toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ object MimaExcludes {
"org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor"),
ProblemFilters.exclude[MissingClassProblem](
"org.apache.spark.storage.BlockManagerMessages$GetRpcHostPortForExecutor$")
) ++ Seq(
// SPARK-3580 Add getNumPartitions method to JavaRDD
ProblemFilters.exclude[MissingMethodProblem](
"org.apache.spark.api.java.JavaRDDLike.getNumPartitions")
)
case v if v.startsWith("1.5") =>
Seq(
Expand Down