Skip to content

Conversation

@suyanNone
Copy link
Contributor

[SPARK-5259]Add task equal() and hashcode() to avoid stage.pendingTasks not accurate while stage was retry
desc:
while run a spark job, it occurs one stage keep retrying and keep throwing FetchMetadataException

reason:
Map Stage 1-> Map Stage2

MapStage1 retry, so have 2 taskSet, taskSet0.0 and TaskSet0.1
TaskSet0.0 and TaskSet0.1 are all running.

When to submit Map Stage2?

       if (!mapStage.isAvailable) {
                  missing += mapStage
                }

  def isAvailable: Boolean = {
    if (!isShuffleMap) {
      true
    } else {
      numAvailableOutputs == numPartitions
    }
  }

how numAvailableOutputs change?

stage.addOutputLoc(smt.partitionId, status)

  def addOutputLoc(partition: Int, status: MapStatus) {
    val prevList = outputLocs(partition)
    outputLocs(partition) = status :: prevList
    if (prevList == Nil) {
      numAvailableOutputs += 1
    }
  }

When to register Map Stage1 out put?

if (runningStages.contains(stage) && stage.pendingTasks.isEmpty) {
}

how stage.pendingTasks change?

event.reason match {
      case Success =>
        listenerBus.post(SparkListenerTaskEnd(stageId, stage.latestInfo.attemptId, taskType,
          event.reason, event.taskInfo, event.taskMetrics))
        stage.pendingTasks -= task

because, because Task not override hashcode and equal, so run same partition task in different TaskSet is different task. and pendingTask is clear when retry map stage, so the pendingTask is always for the new retry TaskSet. then the previous taskset complete some task which have same partition in the latest taskSet. stage.pengdingTask -= task not affect anything. but it affect stage.numAvailableOutputs, because it just identified by partition Id.
So it may result in some stage have submit while his dependency map stage have not registered its output in MapOutputTracker.

@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25592 has finished for PR 4055 at commit e5af95c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class ExperimentalMethods protected[sql](sqlContext: SQLContext)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like an excessively complex way of writing 31 * stageId.hashCode + partitionId.hashCode. I don't think FP is the way to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a better way is (stageId + partitionId) * (stageId + partitionId + 1) / 2 + partitionId.
See http://en.wikipedia.org/wiki/Pairing_function#Cantor_pairing_function

@cloud-fan
Copy link
Contributor

According to your case, I think we can do one more improvement in submitMissingTasks. If the stage is map stage and stage.pendingTasks is not empty, we should not regenerate all tasks but just submit the pending tasks.

@SparkQA
Copy link

SparkQA commented Jan 26, 2015

Test build #26086 has finished for PR 4055 at commit a181772.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@lianhuiwang
Copy link
Contributor

@JoshRosen i think that's ok. because change of code is very small and there is no influence for current logic.

@suyanNone
Copy link
Contributor Author

@srowen, the original hashCode() generated by idea auto-generated hashcode feature. Now I already refined by your comments.
About canEqual(), I just according programming in Scala describe:

to Resolve 
warning: non variable type-argument T in type
pattern is unchecked since it is eliminated by erasure
case that: Branch[T] => this.elem == that.elem &&
class Branch[T](
  val elem: T,
  val left: Tree[T],
  val right: Tree[T]
) extends Tree[T] {
  override def equals(other: Any) = other match {
    case that: Branch[_] => (that canEqual this) &&
                                          this.elem == that.elem &&
                                          this.left == that.left &&
                                          this.right == that.right
    case _ => false
}
   def canEqual(other: Any) = other.isInstanceOf[Branch[_]]
   override def hashCode: Int = 41 * (41 * (41 + elem.hashCode) + left.hashCode) + right.hashCode
}
  Listing 30.4 · A parameterized type with equals and

@cloud-fan , I think 31*hashcode + hashcode is more common method to do that.

@suyanNone
Copy link
Contributor Author

@cloud-fan
According current code, it may not easy to change to not re-submit task in pendingTasks. To be honest, current DAGScheduler is complicated but at some point, is simple for some situation, still a lot may need to be improved.

re-submit occurs have failed-stage and due to a fetch failed. a fetch-failed means current running TaskSet is dead(called zombie), so it just have the already scheduled on Executor task are running, others in stage.pendingTask will never be scheduled in previous taskset. but it can do to just resubmit not scheduled tasks.

@suyanNone
Copy link
Contributor Author

@cloud-fan btw, Do you know HarryZhang? ZJU VLIS Lab

@cloud-fan
Copy link
Contributor

@suyanNone Thanks for the explanation of re-submit!
What's the Chinese name of HarryZhang? We don't use English name in the lab……

@suyanNone
Copy link
Contributor Author

@cloud-fan ZhangLei, SunHongLiang, HanLi, ChenXingYu, blabla...I am ZhangLei's classmate in ZJU.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can try Seq[Int](1).isInstanceOf[Seq[String]] in REPL, it will return true.
isInstanceOf can't work on generic type because of JVM type erasure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan yean, I know that. and in that class, it has no need to add parameter on class level, it could only use in function level run or runContext.

and, this code still have something to refine, like var partitionId to be val, I will refine it at later

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean... something like other.isInstanceOf[ResultTask[_, _]] =.=

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's very slightly better. I agree

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So equals is not overridden in these subclasses because equality does not depend on their additional fields? just checking that this is definitely desirable.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eh...StageId and partitionId is like a unique composite primary key in database. In current spark context, it's sure can be identified by (StageId, PartitionId), even not need to use "canEqual".

@SparkQA
Copy link

SparkQA commented Jan 29, 2015

Test build #26307 has finished for PR 4055 at commit ce54738.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

@cloud-fan --!
[error] /home/jenkins/workspace/SparkPullRequestBuilder/core/src/main/scala/org/apache/spark/scheduler/ResultTask.scala:69: class ResultTask takes type parameters
[error] override def canEqual(other: Any): Boolean = other.isInstanceOf[ResultTask]

@suyanNone
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26373 has finished for PR 4055 at commit adca2aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26374 has finished for PR 4055 at commit adca2aa.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Jan 30, 2015

Test build #26386 has finished for PR 4055 at commit 076f54d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@suyanNone
Copy link
Contributor Author

@srowen @JoshRosen can some one verify this patch.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not string interpolation here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen task.partitionID is Int type.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I mean why how use the same s"..." syntax as in the line above? Int is fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@srowen Others do that...I can't figure out the advantages and disadvantages.

There a lot sentence like:
logInfo("Finished task %s in stage %s (TID %d) in %d ms on %s (%d/%d)".format(
logError("Task %s in stage %s (TID %d) had a not serializable result: %s; not retrying"
.format(i
abort("Task %s in stage %s (TID %d) had a not serializable result: %s".format(

and also has this:
logInfo(
s"Lost task ${info.id} in stage ${taskSet.id} (TID $tid) on executor ${info.host}: " +
s"${ef.className} (${ef.description}) [duplicate $dupCount]")

Need I to refactor it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The s"..." didn't exist before Scala 2.10, so I think that's why the old style is still used in the code. There's no great need to change all that. I think the interpolated style is clearer, and I tend to think that we should match surrounding code style in issues like this. Since interpolation is used in the line above, it seems right to use it here. I agree it's a tiny issue either way.

@SparkQA
Copy link

SparkQA commented Feb 27, 2015

Test build #28038 has finished for PR 4055 at commit 29684d8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 27, 2015

Test build #28061 has finished for PR 4055 at commit 9025cf1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@srowen
Copy link
Member

srowen commented Feb 27, 2015

@cloud-fan @rxin do you have any final thoughts on this? it's looking reasonable to me though I admit I don't know this scheduler code well enough to be confident.

@rxin
Copy link
Contributor

rxin commented Feb 27, 2015

cc @markhamstra and @kayousterhout also

@markhamstra
Copy link
Contributor

I'll take a look over the weekend.

@cloud-fan
Copy link
Contributor

I'm still a little against the canEqual method. In this particular context, I think "(stageId, partitionId)" is meaningful enough to identify a task.

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37907 has finished for PR 4055 at commit 7ae128e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • trait ExpectsInputTypes extends Expression
    • trait ImplicitCastInputTypes extends ExpectsInputTypes
    • trait Unevaluable extends Expression
    • trait Nondeterministic extends Expression
    • trait CodegenFallback extends Expression
    • case class Hex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Unhex(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • abstract class RDG extends LeafExpression with Nondeterministic
    • case class Rand(seed: Long) extends RDG
    • case class Randn(seed: Long) extends RDG
    • case class Ascii(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class Base64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class UnBase64(child: Expression) extends UnaryExpression with ImplicitCastInputTypes
    • case class FakeFileStatus(

@SparkQA
Copy link

SparkQA commented Jul 21, 2015

Test build #37949 has finished for PR 4055 at commit b2df3fd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getServerStatuses has been removed in master -- I guess both of these should be

val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
assert(statuses != null)
assert(statuses.nonEmpty)

The new code will now throw an exception if we're missing the map output data, but I feel like its probably still good to leave those asserts in.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may the below code will be more better?

try {
        mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
      } catch {
        case e: Exception => fail("")
      }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't use try / case e: Exception => fail("") to fail tests when there is an exception -- we just let the exception fail the test directly. You get more info in the stack trace that way. So I think its better to just leave it bare.

You could just put in a comment explaining what the point is:

// this would throw an exception if the map status hadn't been registered
mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)

I still slightly prefer leaving the asserts in there. Yes, they are kinda pointless with the current behavior of getMapSizesByExecutorId -- but I'd just like to be a bit more defensive, in case that behavior changes in the future. (eg., maybe some future refactoring makes them stop throwing exceptions for some reason).

Maybe to be very clear, you could include the asserts and more comments:

// this would throw an exception if the map status hadn't been registered
val statuses = mapOutputTracker.getMapSizesByExecutorId(0, reduceIdx)
// really we should have already thrown an exception rather than fail either of these
// asserts, but just to be extra defensive let's double check the statuses are OK
assert(statuses != null)
assert(statuses.nonEmpty)

This is pretty minor, though, I don't feel strongly about it.

@squito
Copy link
Contributor

squito commented Jul 21, 2015

thanks for updating @suyanNone ! there are compile errors b/c of changes in master, and I left some really minor comments, but I think its basically ready.

btw, feel free to open separate jiras / prs for the other issues you found (and cc me if you like). I do think they are worth discussing, but this the most important fix.

@andrewor14
Copy link
Contributor

@squito @suyanNone is this superseded by #7699? If so, would you mind closing this patch?

@asfgit asfgit closed this in 804a012 Sep 4, 2015
@rxin
Copy link
Contributor

rxin commented Sep 21, 2015

@suyanNone can you add your git commit email to your github profile, so this commit will show up as yours?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants