Skip to content

Conversation

@witgo
Copy link
Contributor

@witgo witgo commented Feb 28, 2014

No description provided.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

1 similar comment
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require(partitions.forall(rddPartitions.contains, "partition index out of range")), more intuitive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

require(partitions.forall(rddPartitions.contains(_)), "partition index out of range") is more readable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I found that the parentheses are mismatching in my comments

@mateiz
Copy link
Contributor

mateiz commented Mar 1, 2014

Jenkins, this is ok to test

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12944/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mind explaining a bit more the case where these two will not match? I'm just wondering if it make more sense to check this invariant inside of the getPartitions function of ShuffleRDD.scala - but maybe there are other code paths where this could get messed up that don't go through that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

 val partitioner = new Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key.hashCode() % 2
 }

val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (-1, 7)))
val shuffled = pairs.partitionBy(partitioner)
shuffled.count

Here error, is looking forward to the results.

val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (6, 7)))
val shuffled = pairs.partitionBy(partitioner)
shuffled.lookup(-1)

Although the log records the error, but Spark to hang

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just scanned the code, this issue (partitions does not match with rdd.partitions.map(_.index)) can only happen when you run the computation based on the correctness of partitioner

In current implementation, there are only two cases:

First is lookup, the computation is based on the correctness of getPartition()

def lookup(key: K): Seq[V] = {
    self.partitioner match {
      case Some(p) =>
        val index = p.getPartition(key)
        def process(it: Iterator[(K, V)]): Seq[V] = {
          val buf = new ArrayBuffer[V]
          for ((k, v) <- it if k == key) {
            buf += v
          }
          buf
        }
        val res = self.context.runJob(self, process _, Array(index), false)
        res(0)
      case None =>
        self.filter(_._1 == key).map(_._2).collect()
    }
  }

the other case is ShuffleMapTask

// Write the map output to its associated buckets.
      for (elem <- rdd.iterator(split, context)) {
        val pair = elem.asInstanceOf[Product2[Any, Any]]
        val bucketId = dep.partitioner.getPartition(pair._1)
        shuffle.writers(bucketId).write(pair)
      }

I'm not sure which fix option is better, add a checking condition in SparkContext, or we have a specific checking in these two places separately

I just felt that without looking at the code, I cannot get the idea why the partitions does not match rdd.partitions (if you look at how SparkContext run the job you will get more confusion, because partitions are exactly derived from "0 until rdd.partitions.size")

/**
   * Run a job on all partitions in an RDD and return the results in an array.
   */
  def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
    runJob(rdd, func, 0 until rdd.partitions.size, false)
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not understand what you mean.
Caused by improper design Partitioner partition index out of range.

val partitioner = new Partitioner {
 override def numPartitions: Int = 2
 override def getPartition(key: Any): Int = key.hashCode() % 2
}

partitioner.getPartition(-1) result -1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@witgo so in the case you mentioned, why not put this check in the constructor of ShuffleRDD? It seems more natural to check it there rather than inside of runjob.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness of the partitioner is related to the input key. The current example, if key> = 0 is no problem. In the constructor can not be detected in.

@rxin
Copy link
Contributor

rxin commented Mar 2, 2014

Hi guys,

I think it is better to make sure Spark doesn't hang when an incorrect partition index is given, because there will be other code paths to run a job. Given the two places @CodingCat found, I think it shouldn't be too hard to fix those. @witgo do you mind doing that instead?

One thing for sure is we shouldn't add a check per key -- that can be too expensive.

@witgo
Copy link
Contributor Author

witgo commented Mar 2, 2014

Yes, add too much require statements is unwise.We guarantee throw an error when appropriate, and the rest to the developer to resolve.

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12951/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check as written is going to have quadratic complexity. If you have 100 partitions for example, you're going to create a list of length 100 at the top and then check for all 100 partitions whether they're in that list, getting 10,000 operations. Can't you just check that all the indices in partitions are between 0 and rdd.partitions.size? I don't think RDDs can have non-contiguous partition numbers, though there might have been some stuff in the past with partition pruning that I may be misremembering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, so it should be better.

require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")

or is this?

val partitionRange = (0 until rdd.partitions.size)
require(partitions.forall(partitionRange.contains(_)), "partition index out of range")

@AmplabJenkins
Copy link

Merged build triggered.

@AmplabJenkins
Copy link

Merged build started.

@AmplabJenkins
Copy link

Merged build finished.

@AmplabJenkins
Copy link

All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/12958/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: You could just do partitions.forall(partitionRange.contains)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my code more readable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two questions

  1. Does the Spark guarantee that RDD has continuous partition index? (I think so, as https://spark-project.atlassian.net/browse/SPARK-911 is still there)
  2. Shall we put the check here or we want to check inside the specific APIs, (the current issue looks more like a bug in lookup() - forget to check the return value of getPartition before using it....)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can be changed
require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")
  1. Custom Partitioner causing a lot of problems
val partitioner = new Partitioner {
  override def numPartitions: Int = 2
  override def getPartition(key: Any): Int = key.hashCode() % 2
 }

val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (-1, 7)))
val shuffled = pairs.partitionBy(partitioner)
shuffled.count

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this code will return IndexOutOfRange?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take a look at use of PartitionPruningRDD ..
On Mar 4, 2014 9:33 AM, "LiGuoqiang" [email protected] wrote:

In core/src/main/scala/org/apache/spark/SparkContext.scala:

@@ -847,6 +847,8 @@ class SparkContext(
partitions: Seq[Int],
allowLocal: Boolean,
resultHandler: (Int, U) => Unit) {

  • val partitionRange = (0 until rdd.partitions.size)
  • require(partitions.forall(partitionRange.contains(_)), "partition index out of range")

Can be changed

require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")

2.

Custom Partitioner causing a lot of problems

val partitioner = new Partitioner {
override def numPartitions: Int = 2
override def getPartition(key: Any): Int = key.hashCode() % 2
}

val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (-1, 7)))
val shuffled = pairs.partitionBy(partitioner)
shuffled.count

Reply to this email directly or view it on GitHubhttps://github.com//pull/44/files#r10242185
.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.lang.ArrayIndexOutOfBoundsException

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

even the PartitionPruningRDD ensures the continuous index space...I think so...


class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boolean)
  extends NarrowDependency[T](rdd) {

  @transient
  val partitions: Array[Partition] = rdd.partitions
    .filter(s => partitionFilterFunc(s.index)).zipWithIndex
    .map { case(split, idx) => new PartitionPruningRDDPartition(idx, split) : Partition }

  override def getParents(partitionId: Int) = {
    List(partitions(partitionId).asInstanceOf[PartitionPruningRDDPartition].parentSplit.index)
  }
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right. The code has been modified .

require(partitions.toSet.diff(rdd.partitions.map(_.index).toSet).isEmpty, "partition index out of range")

@AmplabJenkins
Copy link

Merged build triggered.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants