Skip to content

Conversation

@cloud-fan
Copy link
Contributor

What changes were proposed in this pull request?

We implement typed filter by MapPartitions, which doesn't work well with whole stage codegen. This PR use Filter to implement typed filter and we can get the whole stage codegen support for free.

This PR also introduced DeserializeToObject and SerializeFromObject, to seperate serialization logic from object operator, so that it's eaiser to write optimization rules for adjacent object operators.

How was this patch tested?

existing tests.

@cloud-fan cloud-fan changed the title [SPARK-14270][SQL] whole stage codegen for typed filter [SPARK-14270][SQL] whole stage codegen support for typed filter Mar 30, 2016
@cloud-fan
Copy link
Contributor Author

@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54510 has finished for PR 12061 at commit be182a9.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s\"Unable to generate an encoder for inner class$`

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it involves extra deserialization step, will it get performance benefit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously we need to deserialize input row, call function and serialize to row, but now we don't need to do the final serialization, it should be faster even without whole stage codegen.

@cloud-fan cloud-fan force-pushed the whole-stage-codegen branch from be182a9 to c5b1fc3 Compare March 30, 2016 13:35
@SparkQA
Copy link

SparkQA commented Mar 30, 2016

Test build #54520 has finished for PR 12061 at commit c5b1fc3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s\"Unable to generate an encoder for inner class$`

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a risky thing because as soon as we introduce https://issues.apache.org/jira/browse/SPARK-14083 this test will be useless.

maybe we should introduce a config option now and then explicitly turn off that future optimization

cc @JoshRosen - any good names for the config option?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, we could have a special function wrapper which makes the code un-expression-convertable.

@cloud-fan cloud-fan force-pushed the whole-stage-codegen branch 2 times, most recently from aa95fd6 to bf9f5b5 Compare March 31, 2016 06:07
@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54593 has finished for PR 12061 at commit aa95fd6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s\"Unable to generate an encoder for inner class$`

@SparkQA
Copy link

SparkQA commented Mar 31, 2016

Test build #54600 has finished for PR 12061 at commit bf9f5b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • s\"Unable to generate an encoder for inner class$`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Detected style violation.

Suggested improvement:

override lazy val resolved: Boolean = {
  // If the class to construct is an inner class, we need to get its outer pointer, or this
  // expression should be regarded as unresolved.
  // Note that static inner classes (e.g., inner classes within Scala objects) don't need
  // outer pointer registration.
  val innerStaticClass = outerPointer.isEmpty && cls.isMemberClass && !Modifier.isStatic(cls.getModifiers)
  childrenResolved && !innerStaticClass
}

@SparkQA
Copy link

SparkQA commented Apr 5, 2016

Test build #54957 has finished for PR 12061 at commit 892bdd3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Apr 5, 2016
…bjectOperator

## What changes were proposed in this pull request?

This PR decouples deserializer expression resolution from `ObjectOperator`, so that we can use deserializer expression in normal operators. This is needed by #12061 and #12067 , I abstracted the logic out and put them in this PR to reduce code change in the future.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <[email protected]>

Closes #12131 from cloud-fan/separate.
@cloud-fan cloud-fan force-pushed the whole-stage-codegen branch from 892bdd3 to 98744f0 Compare April 6, 2016 04:53
@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55087 has finished for PR 12061 at commit 98744f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

generated code for a single filter

case class Data(l: Long, s: String)
Seq(Data(1, "a")).toDS().filter(d => d.l % 2 == 0)

is:

/* 032 */   protected void processNext() throws java.io.IOException {
/* 033 */     /*** PRODUCE: Filter <function1>.apply */
/* 034 */
/* 035 */     /*** PRODUCE: INPUT */
/* 036 */
/* 037 */     while (inputadapter_input.hasNext()) {
/* 038 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 039 */       /*** CONSUME: Filter <function1>.apply */
/* 040 */
/* 041 */       /* input[0, bigint] */
/* 042 */       long inputadapter_value = inputadapter_row.getLong(0);
/* 043 */       /* input[1, string] */
/* 044 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 045 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 046 */
/* 047 */       /* <function1>.apply */
/* 048 */       /* <function1> */
/* 049 */       /* expression: <function1> */
/* 050 */       Object filter_obj = ((Expression) references[1]).eval(null);
/* 051 */       scala.Function1 filter_value1 = (scala.Function1) filter_obj;
/* 052 */       /* newInstance(class org.apache.spark.sql.execution.Data) */
/* 053 */       /* assertnotnull(input[0, bigint], - field (class: "scala.Long", name: "l"), - root class: "org.apache.spark.sql.execution.Data") */
/* 054 */       if (false) {
/* 055 */         throw new RuntimeException((String) references[2]);
/* 056 */       }
/* 057 */       /* input[1, string].toString */
/* 058 */       java.lang.String filter_value5 = inputadapter_isNull1 ? null : (java.lang.String) inputadapter_value1.toString();
/* 059 */       boolean filter_isNull5 = filter_value5 == null;
/* 060 */
/* 061 */       final org.apache.spark.sql.execution.Data filter_value2 = new org.apache.spark.sql.execution.Data(inputadapter_value, filter_value5);
/* 062 */       final boolean filter_isNull2 = false;
/* 063 */       boolean filter_value = false ? false : (boolean) ((java.lang.Boolean)filter_value1.apply(filter_value2)).booleanValue();
/* 064 */       if (false || !filter_value) continue;
/* 065 */
/* 066 */       filter_metricValue.add(1);
/* 067 */
/* 068 */       /*** CONSUME: WholeStageCodegen */
/* 069 */
/* 070 */       filter_holder.reset();
/* 071 */
/* 072 */       filter_rowWriter.zeroOutNullBytes();
/* 073 */
/* 074 */       filter_rowWriter.write(0, inputadapter_value);
/* 075 */
/* 076 */       if (inputadapter_isNull1) {
/* 077 */         filter_rowWriter.setNullAt(1);
/* 078 */       } else {
/* 079 */         filter_rowWriter.write(1, inputadapter_value1);
/* 080 */       }
/* 081 */       filter_result.setTotalSize(filter_holder.totalSize());
/* 082 */       append(filter_result);
/* 083 */       if (shouldStop()) return;
/* 084 */     }
/* 085 */   }

for back-to-back filters

case class Data(l: Long, s: String)
Seq(Data(1, "a")).toDS().filter(d => d.l % 2 == 0).filter(d => d.l % 2 == 0)

is:

/* 049 */   protected void processNext() throws java.io.IOException {
/* 050 */     /*** PRODUCE: SerializeFromObject [input[0, object].l AS l#20L,staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromSt... */
/* 051 */
/* 052 */     /*** PRODUCE: Project [obj#15 AS obj#19] */
/* 053 */
/* 054 */     /*** PRODUCE: Filter (<function1>.apply && <function1>.apply) */
/* 055 */
/* 056 */     /*** PRODUCE: DeserializeToObject newInstance(class org.apache.spark.sql.execution.Data) AS obj#15 */
/* 057 */
/* 058 */     /*** PRODUCE: INPUT */
/* 059 */
/* 060 */     while (inputadapter_input.hasNext()) {
/* 061 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 062 */       /*** CONSUME: DeserializeToObject newInstance(class org.apache.spark.sql.execution.Data) AS obj#15 */
/* 063 */       /* input[0, bigint] */
/* 064 */       long inputadapter_value = inputadapter_row.getLong(0);
/* 065 */       /* input[1, string] */
/* 066 */       boolean inputadapter_isNull1 = inputadapter_row.isNullAt(1);
/* 067 */       UTF8String inputadapter_value1 = inputadapter_isNull1 ? null : (inputadapter_row.getUTF8String(1));
/* 068 */
/* 069 */       /*** CONSUME: Filter (<function1>.apply && <function1>.apply) */
/* 070 */
/* 071 */       /* newInstance(class org.apache.spark.sql.execution.Data) */
/* 072 */       /* assertnotnull(input[0, bigint], - field (class: "scala.Long", name: "l"), - root class: "org.apache.spark.sql.execution.Data") */
/* 073 */       if (false) {
/* 074 */         throw new RuntimeException((String) references[0]);
/* 075 */       }
/* 076 */       /* input[1, string].toString */
/* 077 */       java.lang.String deserializetoobject_value3 = inputadapter_isNull1 ? null : (java.lang.String) inputadapter_value1.toString();
/* 078 */       boolean deserializetoobject_isNull3 = deserializetoobject_value3 == null;
/* 079 */
/* 080 */       final org.apache.spark.sql.execution.Data deserializetoobject_value = new org.apache.spark.sql.execution.Data(inputadapter_value, deserializetoobject_value3);
/* 081 */       final boolean deserializetoobject_isNull = false;
/* 082 */
/* 083 */       /* <function1>.apply */
/* 084 */       /* <function1> */
/* 085 */       /* expression: <function1> */
/* 086 */       Object filter_obj = ((Expression) references[2]).eval(null);
/* 087 */       scala.Function1 filter_value1 = (scala.Function1) filter_obj;
/* 088 */
/* 089 */       boolean filter_value = false ? false : (boolean) ((java.lang.Boolean)filter_value1.apply(deserializetoobject_value)).booleanValue();
/* 090 */       if (false || !filter_value) continue;
/* 091 */       /* <function1>.apply */
/* 092 */       /* <function1> */
/* 093 */       /* expression: <function1> */
/* 094 */       Object filter_obj1 = ((Expression) references[3]).eval(null);
/* 095 */       scala.Function1 filter_value4 = (scala.Function1) filter_obj1;
/* 096 */
/* 097 */       boolean filter_value3 = false ? false : (boolean) ((java.lang.Boolean)filter_value4.apply(deserializetoobject_value)).booleanValue();
/* 098 */       if (false || !filter_value3) continue;
/* 099 */
/* 100 */       filter_metricValue.add(1);
/* 101 */
/* 102 */       /*** CONSUME: Project [obj#15 AS obj#19] */
/* 103 */
/* 104 */       /*** CONSUME: SerializeFromObject [input[0, object].l AS l#20L,staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromSt... */
/* 105 */
/* 106 */       /*** CONSUME: WholeStageCodegen */
/* 107 */
/* 108 */       /* input[0, object].l */
/* 109 */       long serializefromobject_value = deserializetoobject_isNull ? -1L : (long) deserializetoobject_value.l();
/* 110 */       /* staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, input[0, object].s, true) */
/* 111 */       /* input[0, object].s */
/* 112 */       java.lang.String serializefromobject_value3 = deserializetoobject_isNull ? null : (java.lang.String) deserializetoobject_value.s();
/* 113 */       boolean serializefromobject_isNull3 = serializefromobject_value3 == null;
/* 114 */
/* 115 */       boolean serializefromobject_isNull2 = !!(serializefromobject_isNull3);
/* 116 */       UTF8String serializefromobject_value2 = null;
/* 117 */
/* 118 */       if (!(serializefromobject_isNull3)) {
/* 119 */         serializefromobject_value2 = org.apache.spark.unsafe.types.UTF8String.fromString(serializefromobject_value3);
/* 120 */         serializefromobject_isNull2 = serializefromobject_value2 == null;
/* 121 */       }
/* 122 */       serializefromobject_holder.reset();
/* 123 */
/* 124 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 125 */
/* 126 */       if (deserializetoobject_isNull) {
/* 127 */         serializefromobject_rowWriter.setNullAt(0);
/* 128 */       } else {
/* 129 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 130 */       }
/* 131 */
/* 132 */       if (serializefromobject_isNull2) {
/* 133 */         serializefromobject_rowWriter.setNullAt(1);
/* 134 */       } else {
/* 135 */         serializefromobject_rowWriter.write(1, serializefromobject_value2);
/* 136 */       }
/* 137 */       serializefromobject_result.setTotalSize(serializefromobject_holder.totalSize());
/* 138 */       append(serializefromobject_result);
/* 139 */       if (shouldStop()) return;
/* 140 */     }
/* 141 */   }

@cloud-fan
Copy link
Contributor Author

The benchmark result for master branch is:

back-to-back filter:                Best/Avg Time(ms)    Rate(M/s)   Per Row(ns)   Relative
-------------------------------------------------------------------------------------------
Dataset                                   724 /  824         13.8          72.4       1.0X

The whole stage version is about 30% faster.

@cloud-fan cloud-fan force-pushed the whole-stage-codegen branch from 92af545 to 0fcaa06 Compare April 6, 2016 13:07
@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55109 has finished for PR 12061 at commit 92af545.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cc @davies

@SparkQA
Copy link

SparkQA commented Apr 6, 2016

Test build #55110 has finished for PR 12061 at commit 0fcaa06.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@davies
Copy link
Contributor

davies commented Apr 6, 2016

LGTM.

@SparkQA
Copy link

SparkQA commented Apr 7, 2016

Test build #55172 has finished for PR 12061 at commit cabe067.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

cc @marmbrus , do you have time to take a look? thx!

@SparkQA
Copy link

SparkQA commented Apr 7, 2016

Test build #55177 has finished for PR 12061 at commit 7f1e770.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor Author

retest this please

@SparkQA
Copy link

SparkQA commented Apr 7, 2016

Test build #55187 has finished for PR 12061 at commit 7f1e770.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Apr 8, 2016

Thanks! Merging to master.

@asfgit asfgit closed this in 49fb237 Apr 8, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants