Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Jun 18, 2016

What changes were proposed in this pull request?

This PR eliminates nullcheck code at projection for an array type. The nullcheck a call to write each element of an array (lines 076-078 in "Before applying this PR") is generated. If we know all of the elements do not have null at compilation time, we can eliminate code for nullcheck.

This PR checks whether ArrayType.containsNull is false to know the all of array elements do not have null.

An example program

val df = sparkContext.parallelize(Seq(1.0, 2.0), 1).toDF("v")
df.selectExpr("Array(v + 2.2, v + 3.3)").collect

Before applying this PR

/* 028 */   protected void processNext() throws java.io.IOException {
/* 029 */     while (inputadapter_input.hasNext()) {
/* 030 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 031 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 032 */
/* 033 */       final boolean project_isNull = false;
/* 034 */       this.project_values = new Object[2];
/* 035 */       double project_value1 = -1.0;
/* 036 */       project_value1 = inputadapter_value + 2.2D;
/* 037 */       if (false) {
/* 038 */         project_values[0] = null;
/* 039 */       } else {
/* 040 */         project_values[0] = project_value1;
/* 041 */       }
/* 042 */
/* 043 */       double project_value4 = -1.0;
/* 044 */       project_value4 = inputadapter_value + 3.3D;
/* 045 */       if (false) {
/* 046 */         project_values[1] = null;
/* 047 */       } else {
/* 048 */         project_values[1] = project_value4;
/* 049 */       }
/* 050 */
/* 051 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 052 */       this.project_values = null;
/* 053 */       project_holder.reset();
/* 054 */
/* 055 */       project_rowWriter.zeroOutNullBytes();
/* 056 */
/* 057 */       if (project_isNull) {
/* 058 */         project_rowWriter.setNullAt(0);
/* 059 */       } else {
/* 060 */         // Remember the current cursor so that we can calculate how many bytes are
/* 061 */         // written later.
/* 062 */         final int project_tmpCursor = project_holder.cursor;
/* 063 */
/* 064 */         if (project_value instanceof UnsafeArrayData) {
/* 065 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 066 */           // grow the global buffer before writing data.
/* 067 */           project_holder.grow(project_sizeInBytes);
/* 068 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 069 */           project_holder.cursor += project_sizeInBytes;
/* 070 */
/* 071 */         } else {
/* 072 */           final int project_numElements = project_value.numElements();
/* 073 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 074 */
/* 075 */           for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 076 */             if (project_value.isNullAt(project_index)) {
/* 077 */               project_arrayWriter.setNullAt(project_index);
/* 078 */             } else {
/* 079 */               final double project_element = project_value.getDouble(project_index);
/* 080 */               project_arrayWriter.write(project_index, project_element);
/* 081 */             }
/* 082 */
/* 083 */           }
/* 084 */         }
/* 085 */
/* 086 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 087 */         project_rowWriter.alignToWords(project_holder.cursor - project_tmpCursor);
/* 088 */       }
/* 089 */       project_result.setTotalSize(project_holder.totalSize());
/* 090 */       append(project_result);
/* 091 */       if (shouldStop()) return;
/* 092 */     }

After applying this PR

/* 028 */   protected void processNext() throws java.io.IOException {
/* 029 */     while (inputadapter_input.hasNext()) {
/* 030 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 031 */       double inputadapter_value = inputadapter_row.getDouble(0);
/* 032 */
/* 033 */       final boolean project_isNull = false;
/* 034 */       this.project_values = new Object[2];
/* 035 */       double project_value1 = -1.0;
/* 036 */       project_value1 = inputadapter_value + 2.2D;
/* 037 */       if (false) {
/* 038 */         project_values[0] = null;
/* 039 */       } else {
/* 040 */         project_values[0] = project_value1;
/* 041 */       }
/* 042 */
/* 043 */       double project_value4 = -1.0;
/* 044 */       project_value4 = inputadapter_value + 3.3D;
/* 045 */       if (false) {
/* 046 */         project_values[1] = null;
/* 047 */       } else {
/* 048 */         project_values[1] = project_value4;
/* 049 */       }
/* 050 */
/* 051 */       final ArrayData project_value = new org.apache.spark.sql.catalyst.util.GenericArrayData(project_values);
/* 052 */       this.project_values = null;
/* 053 */       project_holder.reset();
/* 054 */
/* 055 */       project_rowWriter.zeroOutNullBytes();
/* 056 */
/* 057 */       if (project_isNull) {
/* 058 */         project_rowWriter.setNullAt(0);
/* 059 */       } else {
/* 060 */         // Remember the current cursor so that we can calculate how many bytes are
/* 061 */         // written later.
/* 062 */         final int project_tmpCursor = project_holder.cursor;
/* 063 */
/* 064 */         if (project_value instanceof UnsafeArrayData) {
/* 065 */           final int project_sizeInBytes = ((UnsafeArrayData) project_value).getSizeInBytes();
/* 066 */           // grow the global buffer before writing data.
/* 067 */           project_holder.grow(project_sizeInBytes);
/* 068 */           ((UnsafeArrayData) project_value).writeToMemory(project_holder.buffer, project_holder.cursor);
/* 069 */           project_holder.cursor += project_sizeInBytes;
/* 070 */
/* 071 */         } else {
/* 072 */           final int project_numElements = project_value.numElements();
/* 073 */           project_arrayWriter.initialize(project_holder, project_numElements, 8);
/* 074 */
/* 075 */           for (int project_index = 0; project_index < project_numElements; project_index++) {
/* 076 */             final double project_element = project_value.getDouble(project_index);
/* 077 */             project_arrayWriter.write(project_index, project_element);
/* 078 */
/* 079 */           }
/* 080 */         }
/* 081 */
/* 082 */         project_rowWriter.setOffsetAndSize(0, project_tmpCursor, project_holder.cursor - project_tmpCursor);
/* 083 */         project_rowWriter.alignToWords(project_holder.cursor - project_tmpCursor);
/* 084 */       }
/* 085 */       project_result.setTotalSize(project_holder.totalSize());
/* 086 */       append(project_result);
/* 087 */       if (shouldStop()) return;
/* 088 */     }
/* 089 */   }
/* 090 */ }

How was this patch tested?

Add unit tests

@SparkQA
Copy link

SparkQA commented Jun 18, 2016

Test build #60767 has finished for PR 13757 at commit 3875e1f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Jan 4, 2017

#13909 avoids this issue since the latest code does not execute Lines 72-83 in "Before applying this PR".

@kiszk kiszk closed this Jan 4, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants