From 25c0f7b471356b829d3611972741b4a40982cc2f Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Thu, 3 Sep 2015 12:33:40 +0530 Subject: [PATCH 1/3] Added hashCode methods --- .../scala/org/apache/spark/rdd/RDDOperationScope.scala | 3 +++ .../org/apache/spark/scheduler/AccumulableInfo.scala | 8 +++++++- .../org/apache/spark/rdd/RDDOperationScopeSuite.scala | 7 +++++++ .../org/apache/spark/scheduler/DAGSchedulerSuite.scala | 9 +++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala index 44667281c1063..540cbd688b63b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDDOperationScope.scala @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.{JsonIgnore, JsonInclude, JsonPropertyOr import com.fasterxml.jackson.annotation.JsonInclude.Include import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.scala.DefaultScalaModule +import com.google.common.base.Objects import org.apache.spark.{Logging, SparkContext} @@ -67,6 +68,8 @@ private[spark] class RDDOperationScope( } } + override def hashCode(): Int = Objects.hashCode(id, name, parent) + override def toString: String = toJson } diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index 11d123eec43ca..e33909fef31fe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -34,9 +34,15 @@ class AccumulableInfo private[spark] ( override def equals(other: Any): Boolean = other match { case acc: AccumulableInfo => this.id == acc.id && this.name == acc.name && - this.update == acc.update && this.value == acc.value + this.update == acc.update && this.value == acc.value && + this.internal == acc.internal case _ => false } + + override def hashCode(): Int = { + val state = Seq(id, name, update, value, internal) + state.map(_.hashCode).reduce(31 * _ + _) + } } object AccumulableInfo { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala index f65349e3e3585..b24dbc1aa1abd 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala @@ -38,6 +38,13 @@ class RDDOperationScopeSuite extends SparkFunSuite with BeforeAndAfter { sc.stop() } + test("equals and hashCode") { + val opScope1 = new RDDOperationScope("scope1", id = "1") + val opScope2 = new RDDOperationScope("scope1", id = "1") + assert(opScope1 == opScope2) + assert(opScope1.hashCode() === opScope2.hashCode()) + } + test("getAllScopes") { assert(scope1.getAllScopes === Seq(scope1)) assert(scope2.getAllScopes === Seq(scope1, scope2)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 2e8688cf41d99..5221121742071 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -285,6 +285,15 @@ class DAGSchedulerSuite assertDataStructuresEmpty() } + test("equals and hashCode AccumulableInfo") { + val accInfo1 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, true) + val accInfo2 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false) + val accInfo3 = new AccumulableInfo(1, " Accumulable " + 1, Some("delta" + 1), "val" + 1, false) + assert(accInfo1 !== accInfo2) + assert(accInfo2 === accInfo3) + assert(accInfo2.hashCode() === accInfo3.hashCode()) + } + test("cache location preferences w/ dependency") { val baseRdd = new MyRDD(sc, 1, Nil).cache() val finalRdd = new MyRDD(sc, 1, List(new OneToOneDependency(baseRdd))) From 56be40baad8a52e6493404a4513d1199edf2c166 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Thu, 3 Sep 2015 14:55:46 +0530 Subject: [PATCH 2/3] Updated review comments --- .../main/scala/org/apache/spark/scheduler/AccumulableInfo.scala | 2 +- .../scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index e33909fef31fe..3bacc5d64b9aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -34,7 +34,7 @@ class AccumulableInfo private[spark] ( override def equals(other: Any): Boolean = other match { case acc: AccumulableInfo => this.id == acc.id && this.name == acc.name && - this.update == acc.update && this.value == acc.value && + this.update == acc.update && this.value == acc.value && this.internal == acc.internal case _ => false } diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala index b24dbc1aa1abd..16a92f54f9368 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDOperationScopeSuite.scala @@ -41,7 +41,7 @@ class RDDOperationScopeSuite extends SparkFunSuite with BeforeAndAfter { test("equals and hashCode") { val opScope1 = new RDDOperationScope("scope1", id = "1") val opScope2 = new RDDOperationScope("scope1", id = "1") - assert(opScope1 == opScope2) + assert(opScope1 === opScope2) assert(opScope1.hashCode() === opScope2.hashCode()) } From 3822bd3def3757d348024d51003c74f9bc094617 Mon Sep 17 00:00:00 2001 From: Vinod K C Date: Thu, 3 Sep 2015 18:56:15 +0530 Subject: [PATCH 3/3] Changed reduce to reduceLeft --- .../main/scala/org/apache/spark/scheduler/AccumulableInfo.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala index 3bacc5d64b9aa..b6bff64ee368e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/AccumulableInfo.scala @@ -41,7 +41,7 @@ class AccumulableInfo private[spark] ( override def hashCode(): Int = { val state = Seq(id, name, update, value, internal) - state.map(_.hashCode).reduce(31 * _ + _) + state.map(_.hashCode).reduceLeft(31 * _ + _) } }