Skip to content

Conversation

@JerryLead
Copy link
Contributor

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

The f closure of PartitionsRDD(ZippedPartitionsRDD2) contains a $outer that references EdgeRDD/VertexRDD, which causes task's serialization chain become very long in iterative GraphX applications. As a result, StackOverflow error will occur. If we set "f = null" in clearDependencies(), checkpoint() can cut off the long serialization chain. More details and explanation can be found in the JIRA.

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@rxin
Copy link
Contributor

rxin commented Dec 2, 2014

cc @tdas can you take a look at this?

@rxin
Copy link
Contributor

rxin commented Dec 2, 2014

I talked to @tdas and this is fine, but even with this, we should figure out why f is capturing its outer this way and remove that since it is expensive for serialization. cc @ankurdave

@ankurdave
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Dec 3, 2014

Test build #24064 has started for PR 3545 at commit f7faea5.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Dec 3, 2014

Test build #24064 has finished for PR 3545 at commit f7faea5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@AmplabJenkins
Copy link

Test PASSed.
Refer to this link for build results (access rights to CI server needed):
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/24064/
Test PASSed.

@jason-dai
Copy link

Maybe we can try something like:

class ZippedPartitionsRDD2 (sc, f, …) {
  val cleanF = (part1, part2, ctx) => sc.clean(f(rdd1.iterator(part1, ctx), rdd2.iterator(part2, context)))

  override def compute(s: Partition, context: TaskContext): Iterator[V] = {
    …
    cleanF(partitions(0), partitions(1), context)
  }
  …
}

@JerryLead
Copy link
Contributor Author

I write a simple example as follows to show how the abnormal $outer is generated. I think the reason is that zipPartitions() in foo() uses the VertexRDD.bar(). As a result, VertexRDD becomes the $outer and its member variable PartitionsRDD will be serialized. @rxin @tdas @ankurdave @jason-dai

The f=>$outer=>VertexRDD=>PartitionsRDD=>v will be serialized while the task is being serialized (shown in the following figures). P.S., Does cleanF in compute() have problems when this RDD is recomputed?

import org.apache.spark.SparkContext
import org.apache.spark.rdd._


class PartitionsRDD extends Serializable {
  val v = "partitionsRDD"
}

class VertexRDD extends Serializable {

  val partitionsRDD = new PartitionsRDD

  def foo(pairs1: RDD[(Int, Char)], pairs2: RDD[(Int, Char)]) = {
    val zipRDD = pairs1.zipPartitions(pairs2) {
      (thisIter, otherIter) =>
        val p1 = thisIter.next()
        val p2 = otherIter.next()

        Iterator((p1._1 + p2._1 + bar(), p1._2 + ":" + p2._2))
    }
    zipRDD
  }

  def bar() = 5
}

object AbnormalFClosure {


  def main(args: Array[String]) {

    val sc = new SparkContext("local", "ZippedPartition Test")


    val data1 = Array[(Int, Char)]((1, 'a'), (2, 'b'),
      (3, 'c'), (4, 'd'),
      (5, 'e'), (3, 'f'),
      (2, 'g'), (1, 'h'))
    val pairs1 = sc.parallelize(data1, 2)


    val data2 = Array[(Int, Char)]((1, 'A'), (2, 'B'),
      (3, 'C'), (4, 'D'))
    val pairs2 = sc.parallelize(data2, 2)

    val zipRDD = new VertexRDD().foo(pairs1, pairs2)

    zipRDD.count()

  }
}

The serialization chain of f=>$outer(i.e., VertexRDD)=>PartitionsRDD=>v:
bb7f311ac041c1affbe633e96fe12634

PartitionsRDD is being serialized:
2d06b6eb2694f9b5dd268f373dcae9de

@jason-dai
Copy link

I believe ClosureCleaner.clean() is defined to deal with exactly this issue: scala may capture the entire class in closure, even if only one member variable is used.

asfgit pushed a commit that referenced this pull request Dec 3, 2014
…ation chain

The related JIRA is https://issues.apache.org/jira/browse/SPARK-4672

The f closure of `PartitionsRDD(ZippedPartitionsRDD2)` contains a `$outer` that references EdgeRDD/VertexRDD, which causes task's serialization chain become very long in iterative GraphX applications. As a result, StackOverflow error will occur. If we set "f = null" in `clearDependencies()`, checkpoint() can cut off the long serialization chain. More details and explanation can be found in the JIRA.

Author: JerryLead <[email protected]>
Author: Lijie Xu <[email protected]>

Closes #3545 from JerryLead/my_core and squashes the following commits:

f7faea5 [JerryLead] checkpoint() should clear the f to avoid StackOverflow error
c0169da [JerryLead] Merge branch 'master' of https://github.com/apache/spark
52799e3 [Lijie Xu] Merge pull request #1 from apache/master

(cherry picked from commit 77be8b9)
Signed-off-by: Ankur Dave <[email protected]>
@asfgit asfgit closed this in 77be8b9 Dec 3, 2014
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants