Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jun 16, 2016

What changes were proposed in this pull request?

This PR eliminates redundant cast from an ArrayType with containsNull = false or a MapType with containsNull = false.

For example, in ArrayType case, current implementation leaves a cast cast(value#63 as array<double>).toDoubleArray. However, we can eliminate cast(value#63 as array<double>) if we know value#63 does not include null. This PR apply this elimination for ArrayType and MapType in SimplifyCasts at a plan optimization phase.

In summary, we got 1.2-1.3x performance improvements over the code before applying this PR.
Here are performance results of benchmark programs:

  test("Read array in Dataset") {
    import sparkSession.implicits._

    val iters = 5
    val n = 1024 * 1024
    val rows = 15

    val benchmark = new Benchmark("Read primnitive array", n)

    val rand = new Random(511)
    val intDS = sparkSession.sparkContext.parallelize(0 until rows, 1)
      .map(i => Array.tabulate(n)(i => i)).toDS()
    intDS.count() // force to create ds
    val lastElement = n - 1
    val randElement = rand.nextInt(lastElement)

    benchmark.addCase(s"Read int array in Dataset", numIters = iters)(iter => {
      val idx0 = randElement
      val idx1 = lastElement
      intDS.map(a => a(0) + a(idx0) + a(idx1)).collect
    })

    val doubleDS = sparkSession.sparkContext.parallelize(0 until rows, 1)
      .map(i => Array.tabulate(n)(i => i.toDouble)).toDS()
    doubleDS.count() // force to create ds

    benchmark.addCase(s"Read double array in Dataset", numIters = iters)(iter => {
      val idx0 = randElement
      val idx1 = lastElement
      doubleDS.map(a => a(0) + a(idx0) + a(idx1)).collect
    })

    benchmark.run()
  }

Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.4
Intel(R) Core(TM) i5-5257U CPU @ 2.70GHz

without this PR
Read primnitive array:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Read int array in Dataset                      525 /  690          2.0         500.9       1.0X
Read double array in Dataset                   947 / 1209          1.1         902.7       0.6X

with this PR
Read primnitive array:                   Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------
Read int array in Dataset                      400 /  492          2.6         381.5       1.0X
Read double array in Dataset                   788 /  870          1.3         751.4       0.5X

An example program that originally caused this performance issue.

val ds = Seq(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0)).toDS()
val ds2 = ds.map(p => {
     var s = 0.0
     for (i <- 0 to 2) { s += p(i) }
     s
   })
ds2.show
ds2.explain(true)

Plans before this PR

== Parsed Logical Plan ==
'SerializeFromObject [input[0, double, true] AS value#68]
+- 'MapElements <function1>, obj#67: double
   +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#66: [D
      +- LocalRelation [value#63]

== Analyzed Logical Plan ==
value: double
SerializeFromObject [input[0, double, true] AS value#68]
+- MapElements <function1>, obj#67: double
   +- DeserializeToObject cast(value#63 as array<double>).toDoubleArray, obj#66: [D
      +- LocalRelation [value#63]

== Optimized Logical Plan ==
SerializeFromObject [input[0, double, true] AS value#68]
+- MapElements <function1>, obj#67: double
   +- DeserializeToObject cast(value#63 as array<double>).toDoubleArray, obj#66: [D
      +- LocalRelation [value#63]

== Physical Plan ==
*SerializeFromObject [input[0, double, true] AS value#68]
+- *MapElements <function1>, obj#67: double
   +- *DeserializeToObject cast(value#63 as array<double>).toDoubleArray, obj#66: [D
      +- LocalTableScan [value#63]

Plans after this PR

== Parsed Logical Plan ==
'SerializeFromObject [input[0, double, true] AS value#6]
+- 'MapElements <function1>, obj#5: double
   +- 'DeserializeToObject unresolveddeserializer(upcast(getcolumnbyordinal(0, ArrayType(DoubleType,false)), ArrayType(DoubleType,false), - root class: "scala.Array").toDoubleArray), obj#4: [D
      +- LocalRelation [value#1]

== Analyzed Logical Plan ==
value: double
SerializeFromObject [input[0, double, true] AS value#6]
+- MapElements <function1>, obj#5: double
   +- DeserializeToObject cast(value#1 as array<double>).toDoubleArray, obj#4: [D
      +- LocalRelation [value#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, double, true] AS value#6]
+- MapElements <function1>, obj#5: double
   +- DeserializeToObject value#1.toDoubleArray, obj#4: [D
      +- LocalRelation [value#1]

== Physical Plan ==
*SerializeFromObject [input[0, double, true] AS value#6]
+- *MapElements <function1>, obj#5: double
   +- *DeserializeToObject value#1.toDoubleArray, obj#4: [D
      +- LocalTableScan [value#1]

How was this patch tested?

Tested by new test cases in SimplifyCastsSuite

@SparkQA
Copy link

SparkQA commented Jun 16, 2016

Test build #60636 has finished for PR 13704 at commit 17f17d6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where do we ensure it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume that here is a part to generate code for Java primitive arrays regarding from and to. Since Java primitive array (e.g. int[]) cannot have null value unlike SQL, I said "ensure not null in input and output arrays."
What do you think?

@cloud-fan
Copy link
Contributor

cloud-fan commented Jun 27, 2016

can you also put the plan tree of the example program in PR description? thanks!

@kiszk
Copy link
Member Author

kiszk commented Jun 27, 2016

@cloud-fan I updated the PR description by adding plan trees.

@cloud-fan
Copy link
Contributor

I'm wondering why we have this Cast in DeserializeToObject. Isn't the value#63 already a double array?

@kiszk
Copy link
Member Author

kiszk commented Jun 27, 2016

@cloud-fan I think value#63 is UnsafeArrayData.
When I ran a DataFrame program, I got the following trees. Since operations for DataFrame access data in UnsafeArrayData, I think that LocalRelation and LocalTableScan keep an array as UnsafeArrayData in InternalRow.
What do you think?

val df = Seq(Array(1.0, 2.0, 3.0), Array(4.0, 5.0, 6.0)).toDF()
val df2 = df.selectExpr("value[0] + value[1] + value[2]")
df2.show
df2.explain(true)

== Analyzed Logical Plan ==
((value[0] + value[1]) + value[2]): double
Project [((value#63[0] + value#63[1]) + value#63[2]) AS ((value[0] + value[1]) + value[2])#67]
+- LocalRelation [value#63]

== Optimized Logical Plan ==
LocalRelation [((value[0] + value[1]) + value[2])#67]

== Physical Plan ==
LocalTableScan [((value[0] + value[1]) + value[2])#67]

@cloud-fan
Copy link
Contributor

Sorry I should say it more explicitly: value#63 is already double array type. UnsafeArrayData is an internal data representation of array type, so it seems weird to have the Cast there.

@kiszk
Copy link
Member Author

kiszk commented Jun 28, 2016

I agree that value#63 is double array type. value#63 is stored as UnsafeArrayData in UnsafeRow.
<function1> in MapElements <function1>, obj#67: double is represented by Java byte code instead of Expressions. We have to pass an Java primitive array double[] instead of double array in UnsafeArrayData. I think that Cast performs this conversion of double array from UnsafeArrayData to double[]. This is an issue only for Dataset.

What do you think?

@cloud-fan
Copy link
Contributor

From the plan tree given by you, the Cast is casting value#63 to array type, not ObjectType(classOf[Array[Double]]), there should be some other reason that we need to find out(maybe it's because the nullability of array element is different).

@kiszk
Copy link
Member Author

kiszk commented Jun 28, 2016

Regarding the plan tree printout (I removed my debug information), the Cast performs conversion from UnsafeArrayData to GenericArrayData since the target type of Cast is ArrayType<DoubleType>. Since Cast shows dataType. simpleType of the target type, it is shown as array<double>.

Regarding the generated code, we seems to be on the same page. What you said is not done in Cast now, and what I did. IIUC, current Cast code generation for array seems to be conservative. Current goal of cast code generation is to create GenericArrayData object. This code generation always creates Object[] and assign values into each Object[] element. Then, the generated code passes Object[] to the constructor of GenericArrayData. As you pointed out, if code generation takes care of nullability, it can avoid to create Object[]. Unfortunately, it is not done in [the current code](code generation for array (Lines 828-862).
The code generated by this PR can also use specialized GenericArrayData implemented by #13758.

@cloud-fan
Copy link
Contributor

the Cast performs conversion from UnsafeArrayData to GenericArrayData.

Cast is used to cast one type to another, not cast one kind of data representation to another, so I don't quite understand why the Cast is there.

This code generation always creates Object[] and assign values into each Object[] element.

This is reasonable, as it needs to take care of null elements. And we do have a chance to optimize it: if the target array type's element type is primitive and the input array type's element nullability is false, we can avoid using Object[].

@kiszk
Copy link
Member Author

kiszk commented Jun 29, 2016

Let me check which code portion inserts Cast in this tree.

And we do have a chance to optimize it: if the target array type's element type is primitive and the input array type's element nullability is false, we can avoid using Object[].

I agree. This PR generates optimized code without using Object[] by check the above conditions.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to make sure the input array's element nullability is false, but primitive type array doesn't guarantee it. e.g. we can have ArrayType(ByteType, true)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you are right. The latest code also checks ArrayType.containsNull of from and to.

@kiszk
Copy link
Member Author

kiszk commented Jul 1, 2016

@cloud-fan, I checked the following

Let me check which code portion inserts Cast in this tree.

IIUC, this code inserts the corresponding Cast in Analyzed Logical Plan to upcast in Parsed Logical Plan.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61629 has finished for PR 13704 at commit 2d3d34a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 1, 2016

Test build #61630 has finished for PR 13704 at commit 256c861.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 3, 2016

Test build #61677 has finished for PR 13704 at commit 77859cf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 3, 2016

@cloud-fan regarding a cast, I think that you are right. The issue was that the original cast was ArrayType(Double, false) to ArrayType(Double, true).
I changed to pass information on ArrayType.containsNull of destination of Cast now if the source of ArrayType.containsNull is false . In the example in this PR description, the condition is satisfied. Now, the cast is ArrayType(Double, false) to ArrayType(Double, false). As a result, after the logical optimizations, we got a plan tree without Cast. I updated the description of PR.

== Analyzed Logical Plan ==
value: double
SerializeFromObject [input[0, double, true] AS value#6]
+- MapElements <function1>, obj#5: double
   +- DeserializeToObject cast(value#1 as array<double>).toDoubleArray, obj#4: [D
      +- LocalRelation [value#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, double, true] AS value#6]
+- MapElements <function1>, obj#5: double
   +- DeserializeToObject value#1.toDoubleArray, obj#4: [D
      +- LocalRelation [value#1]

@SparkQA
Copy link

SparkQA commented Jul 10, 2016

Test build #62050 has finished for PR 13704 at commit 43ced15.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class PrimitiveArrayBenchmark extends BenchmarkBase

@SparkQA
Copy link

SparkQA commented Jul 10, 2016

Test build #62057 has finished for PR 13704 at commit 677d81e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jul 10, 2016

@cloud-fan could you please review this? As you pointed, I also changed code related to cast. I added benchmark results, too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about we move this into another PR? I think the main purpose of this PR is to eliminate the unnecessary Cast

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will create another PR and update the description.

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62071 has finished for PR 13704 at commit 66800fa.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

case c @ Cast(e, dataType) => (e.dataType, dataType) match {
  case (ArrayType(from, false), ArrayType(to, true)) if from == to => e
  case (MapType(fromKey, fromValue, false), ArrayType(toKey, toValue, true)) if fromKey == toKey && fromValue == toValue => e
  case _ => c
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, I like this simple one

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62084 has finished for PR 13704 at commit c31729f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62087 has finished for PR 13704 at commit 1bbe859.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 11, 2016

Test build #62097 has finished for PR 13704 at commit b7477de.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • class SimplifyCastsSuite extends PlanTest

@SparkQA
Copy link

SparkQA commented Aug 29, 2016

Test build #64594 has finished for PR 13704 at commit c8f87a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Aug 30, 2016

@liancheng Could you please review this since I resolved conflict?

comparePlans(optimized, expected)
}

test("non-nullable to nullable array cast") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-nullable element array to nullable element array cast

@cloud-fan
Copy link
Contributor

left some comment, let's go ahead and merge it after that :)

@SparkQA
Copy link

SparkQA commented Aug 30, 2016

Test build #64658 has finished for PR 13704 at commit 40ac2bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in d92cd22 Aug 31, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants