Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jun 26, 2016

What changes were proposed in this pull request?

This PR optimize generate code of projection for an primitive type array. While we know primitive type array does not require null check and has contigious data region, current generated code performs null checks and performs copy for each element (at Lines 075-082 at Generated code before applying this PR)

  1. Eliminate null checks for each array element
  2. Perform bulk data copy by using Platform.copy
  3. Eliminate primitive array allocation in GenericArrayData when [SPARK-16043][SQL] Prepare GenericArrayData implementation specialized for a primitive array #13758 is merged
  4. Eliminate setting sparse index for UnsafeArrayData when [SPARK-15962][SQL] Introduce implementation with a dense format for UnsafeArrayData #13680 is merged

They are done in a helper method UnsafeArrayWrite.writePrimitive<PrimitiveType>Array() (at Line 075 at Generated code after applying this PR).

For now, 3 and 4 are not currently enabled. But, code are ready.

Benchmark program

  def writeArray(iters: Int): Unit = {
    import sparkSession.implicits._

    val iters = 5
    val n = 1024 * 1024
    val rows = 15

    val benchmark = new Benchmark("Read primitive array", n)

    val intDF = sparkSession.sparkContext.parallelize(0 until rows, 1)
      .map(i => Array.tabulate(n)(i => i)).toDF()
    intDF.count() // force to create df

    benchmark.addCase(s"Write int array in DataFrame", numIters = iters)(iter => {
      intDF.selectExpr("value as a").collect
    })

    val doubleDF = sparkSession.sparkContext.parallelize(0 until rows, 1)
      .map(i => Array.tabulate(n)(i => i.toDouble)).toDF()
    doubleDF.count() // force to create df

    benchmark.addCase(s"Write double array in DataFrame", numIters = iters)(iter => {
      doubleDF.selectExpr("value as a").collect
    })

    benchmark.run()
  }

  test("Write an array in DataFrame") {
    writeArray(1)
  }

An example program

val df = sparkContext.parallelize(Seq(0.0d, 1.0d), 1).toDF
df.selectExpr("Array(value + 1.1d, value + 2.2d)").collect

Generated code before applying this PR

/* 028 */   protected void processNext() throws java.io.IOException {
/* 029 */     while (inputadapter_input.hasNext()) {
/* 030 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 031 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 032 */
/* 033 */       final boolean project_isNull = false;
/* 034 */       this.project_values = new Object[2];
/* 035 */       double project_value1 = -1.0;
/* 036 */       project_value1 = inputadapter_value + 1.1D;
/* 037 */       if (false) {
/* 038 */         project_values[0] = null;
/* 039 */       } else {
/* 040 */         project_values[0] = project_value1;
/* 041 */       }
/* 042 */
/* 043 */       double project_value4 = -1.0;
/* 044 */       project_value4 = inputadapter_value + 2.2D;
/* 045 */       if (false) {
/* 046 */         project_values[1] = null;
/* 047 */       } else {
/* 048 */         project_values[1] = project_value4;
/* 049 */       }
/* 050 */
/* 051 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 052 */       this.project_values = null;
/* 053 */       project_holder.reset();
/* 054 */
/* 055 */       project_rowWriter.zeroOutNullBytes();
/* 056 */
/* 057 */       if (project_isNull) {
/* 058 */         project_rowWriter.setNullAt(0);
/* 059 */       } else {
/* 060 */         // Remember the current cursor so that we can calculate how many bytes are
/* 061 */         // written later.
/* 062 */         final int project_tmpCursor = project_holder.cursor;
/* 063 */
/* 064 */         if (project_value instanceof UnsafeArrayData) {
/* 065 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 066 */           // grow the global buffer before writing data.
/* 067 */           project_holder.grow(project_sizeInBytes);
/* 068 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 069 */           project_holder.cursor += project_sizeInBytes;
/* 070 */
/* 071 */         } else {
/* 072 */           final int project_numElements = project_value.numElements();
/* 073 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 074 */
/* 075 */           for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 076 */             if (project_value.isNullAt(project_index)) {
/* 077 */               project_arrayWriter.setNullAt(project_index);
/* 078 */             } else {
/* 079 */               final double project_element = project_value.getDouble(project_index);
/* 080 */               project_arrayWriter.write(project_index, project_element);
/* 081 */             }
/* 082 */           }
/* 083 */
/* 084 */         }
/* 085 */
/* 086 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 087 */         project_rowWriter.alignToWords(project_holder.cursor - project_tmpCursor);
/* 088 */       }
/* 089 */       project_result.setTotalSize(project_holder.totalSize());
/* 090 */       append(project_result);
/* 091 */       if (shouldStop()) return;
/* 092 */     }
/* 093 */   }
/* 094 */ }

Generated code after applying this PR

/* 028 */   protected void processNext() throws java.io.IOException {
/* 029 */     while (inputadapter_input.hasNext()) {
/* 030 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 031 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 032 */
/* 033 */       final boolean project_isNull = false;
/* 034 */       this.project_values = new Object[2];
/* 035 */       double project_value1 = -1.0;
/* 036 */       project_value1 = inputadapter_value + 1.1D;
/* 037 */       if (false) {
/* 038 */         project_values[0] = null;
/* 039 */       } else {
/* 040 */         project_values[0] = project_value1;
/* 041 */       }
/* 042 */
/* 043 */       double project_value4 = -1.0;
/* 044 */       project_value4 = inputadapter_value + 2.2D;
/* 045 */       if (false) {
/* 046 */         project_values[1] = null;
/* 047 */       } else {
/* 048 */         project_values[1] = project_value4;
/* 049 */       }
/* 050 */
/* 051 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 052 */       this.project_values = null;
/* 053 */       project_holder.reset();
/* 054 */
/* 055 */       project_rowWriter.zeroOutNullBytes();
/* 056 */
/* 057 */       if (project_isNull) {
/* 058 */         project_rowWriter.setNullAt(0);
/* 059 */       } else {
/* 060 */         // Remember the current cursor so that we can calculate how many bytes are
/* 061 */         // written later.
/* 062 */         final int project_tmpCursor = project_holder.cursor;
/* 063 */
/* 064 */         if (project_value instanceof UnsafeArrayData) {
/* 065 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 066 */           // grow the global buffer before writing data.
/* 067 */           project_holder.grow(project_sizeInBytes);
/* 068 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 069 */           project_holder.cursor += project_sizeInBytes;
/* 070 */
/* 071 */         } else {
/* 072 */           final int project_numElements = project_value.numElements();
/* 073 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 074 */
/* 075 */           project_arrayWriter.writePrimitiveDoubleArray(project_value);
/* 076 */         }
/* 077 */
/* 078 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 079 */         project_rowWriter.alignToWords(project_holder.cursor - project_tmpCursor);
/* 080 */       }
/* 081 */       project_result.setTotalSize(project_holder.totalSize());
/* 082 */       append(project_result);
/* 083 */       if (shouldStop()) return;
/* 084 */     }
/* 085 */   }

How was this patch tested?

Added test suites into DataFrameComplexTypeSuite

@SparkQA
Copy link

SparkQA commented Jun 26, 2016

Test build #61256 has finished for PR 13911 at commit b1f6289.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 26, 2016

Test build #61262 has finished for PR 13911 at commit 333f7b6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 2, 2016

Test build #61651 has finished for PR 13911 at commit 2e8fb0e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jul 14, 2016

Test build #62312 has finished for PR 13911 at commit 4b70df9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68066 has finished for PR 13911 at commit 88aad46.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 3, 2016

Test build #68073 has finished for PR 13911 at commit 25ddd8e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 4, 2016

Test build #68140 has finished for PR 13911 at commit 6204298.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Nov 8, 2016

This was implemented by another approach in #15044

@kiszk kiszk closed this Nov 8, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants