Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.sql
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.sql.{Date, Timestamp}

import org.scalatest.exceptions.TestFailedException

import org.apache.spark.{SparkException, TaskContext}
import org.apache.spark.sql.catalyst.ScroogeLikeExample
import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder}
Expand Down Expand Up @@ -67,6 +69,41 @@ class DatasetSuite extends QueryTest with SharedSQLContext {
data: _*)
}

test("toDS should compare map with byte array keys correctly") {
// Choose the order of arrays in such way, that sorting keys of different maps by _.toString
// will not incidentally put equal keys together.
val arrays = (1 to 5).map(_ => Array[Byte](0.toByte, 0.toByte)).sortBy(_.toString).toArray
arrays(0)(1) = 1.toByte
arrays(1)(1) = 2.toByte
arrays(2)(1) = 2.toByte
arrays(3)(1) = 1.toByte

val mapA = Map(arrays(0) -> "one", arrays(2) -> "two")
val subsetOfA = Map(arrays(0) -> "one")
val equalToA = Map(arrays(1) -> "two", arrays(3) -> "one")
val notEqualToA1 = Map(arrays(1) -> "two", arrays(3) -> "not one")
val notEqualToA2 = Map(arrays(1) -> "two", arrays(4) -> "one")

// Comparing map with itself
checkDataset(Seq(mapA).toDS(), mapA)

// Comparing map with equivalent map
checkDataset(Seq(equalToA).toDS(), mapA)
checkDataset(Seq(mapA).toDS(), equalToA)

// Comparing map with it's subset
intercept[TestFailedException](checkDataset(Seq(subsetOfA).toDS(), mapA))
intercept[TestFailedException](checkDataset(Seq(mapA).toDS(), subsetOfA))

// Comparing map with another map differing by single value
intercept[TestFailedException](checkDataset(Seq(notEqualToA1).toDS(), mapA))
intercept[TestFailedException](checkDataset(Seq(mapA).toDS(), notEqualToA1))

// Comparing map with another map differing by single key
intercept[TestFailedException](checkDataset(Seq(notEqualToA2).toDS(), mapA))
intercept[TestFailedException](checkDataset(Seq(mapA).toDS(), notEqualToA2))
}

test("toDS with RDD") {
val ds = sparkContext.makeRDD(Seq("a", "b", "c"), 3).toDS()
checkDataset(
Expand Down
6 changes: 3 additions & 3 deletions sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,9 @@ object QueryTest {
case (a: Array[_], b: Array[_]) =>
a.length == b.length && a.zip(b).forall { case (l, r) => compare(l, r)}
case (a: Map[_, _], b: Map[_, _]) =>
val entries1 = a.iterator.toSeq.sortBy(_.toString())
val entries2 = b.iterator.toSeq.sortBy(_.toString())
compare(entries1, entries2)
a.size == b.size && a.keys.forall { aKey =>
b.keys.find(bKey => compare(aKey, bKey)).exists(bKey => compare(a(aKey), b(bKey)))
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

      a.size == b.size && a.keys.forall { aKey =>
        val maybeBKey = b.keys.find(bKey => compare(aKey, bKey))
        maybeBKey.isDefined && compare(a(aKey), b(maybeBKey.get))
      }

? I think it's similar with other iterable or array comparison.

cc @cloud-fan who touched this code lately.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, this looks cleaner

case (a: Iterable[_], b: Iterable[_]) =>
a.size == b.size && a.zip(b).forall { case (l, r) => compare(l, r)}
case (a: Product, b: Product) =>
Expand Down