Skip to content

Conversation

@kiszk
Copy link
Member

@kiszk kiszk commented Mar 15, 2017

What changes were proposed in this pull request?

This PR fixes NullPointerException in the generated code by Catalyst. When we run the following code, we get the following NullPointerException. This is because there is no null checks for inputadapter_value while java.lang.Long inputadapter_value at Line 30 may have null.

This happen when a type of DataFrame is nullable primitive type such as java.lang.Long and the wholestage codegen is used. While the physical plan keeps nullable=true in input[0, java.lang.Long, true].longValue, BoundReference.doGenCode ignores nullable=true. Thus, nullcheck code will not be generated and NullPointerException will occur.

This PR checks the nullability and correctly generates nullcheck if needed.

sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF.collect
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:37)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:393)
...

Generated code without this PR

/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       java.lang.Long inputadapter_value = (java.lang.Long)inputadapter_row.get(0, null);
/* 031 */
/* 032 */       boolean serializefromobject_isNull = true;
/* 033 */       long serializefromobject_value = -1L;
/* 034 */       if (!false) {
/* 035 */         serializefromobject_isNull = false;
/* 036 */         if (!serializefromobject_isNull) {
/* 037 */           serializefromobject_value = inputadapter_value.longValue();
/* 038 */         }
/* 039 */
/* 040 */       }
/* 041 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 042 */
/* 043 */       if (serializefromobject_isNull) {
/* 044 */         serializefromobject_rowWriter.setNullAt(0);
/* 045 */       } else {
/* 046 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 047 */       }
/* 048 */       append(serializefromobject_result);
/* 049 */       if (shouldStop()) return;
/* 050 */     }
/* 051 */   }
/* 052 */ }

Generated code with this PR

/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 031 */       java.lang.Long inputadapter_value = inputadapter_isNull ? null : ((java.lang.Long)inputadapter_row.get(0, null));
/* 032 */
/* 033 */       boolean serializefromobject_isNull = true;
/* 034 */       long serializefromobject_value = -1L;
/* 035 */       if (!inputadapter_isNull) {
/* 036 */         serializefromobject_isNull = false;
/* 037 */         if (!serializefromobject_isNull) {
/* 038 */           serializefromobject_value = inputadapter_value.longValue();
/* 039 */         }
/* 040 */
/* 041 */       }
/* 042 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 043 */
/* 044 */       if (serializefromobject_isNull) {
/* 045 */         serializefromobject_rowWriter.setNullAt(0);
/* 046 */       } else {
/* 047 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 048 */       }
/* 049 */       append(serializefromobject_result);
/* 050 */       if (shouldStop()) return;
/* 051 */     }
/* 052 */   }
/* 053 */ }

How was this patch tested?

Added new test suites in DataFrameSuites

@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74593 has finished for PR 17302 at commit ec6aa50.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74608 has finished for PR 17302 at commit 2deeba8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 15, 2017

jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 15, 2017

Test build #74614 has finished for PR 17302 at commit 2deeba8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 17, 2017

Test build #74716 has finished for PR 17302 at commit 43678e7.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 19, 2017

Test build #74830 has finished for PR 17302 at commit fd4fc3d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 19, 2017

cc @cloud-fan

def apply[T: Encoder](rdd: RDD[T], session: SparkSession): LogicalPlan = {
val externalRdd = ExternalRDD(CatalystSerde.generateObjAttr[T], rdd)(session)
val attr = {
val attr = CatalystSerde.generateObjAttr[T]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we fix CatalystSerde.generateObjAttr?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When ExternalRDD.apply calls CatalystSerde.generateObjAttr, deserializer.nullable has not been resolved yet. In other call path to CatalystSerde.generateObjAttr, deserializer.nullable may be already resolved.

I think that this problem depends on the calling context. If we fix CatalystSerde.generateObjAttr by using a general approach, the following approach is possible:

  1. call deserializer.nullable in CatalystSerde.generateObjAttr
  2. If an exception does not occurs, use a value of deserializer.nullable
  3. If an unresolved exception occurs, infer nullable based on deserializer.dataType

Is this OK with you instead of modifying ExternalRDD.apply()?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an easy way is to use ScalaReflection.schemaFor[T] to get the nullability information

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. ScalaReflection.schemaFor[T] requires TypeTag.
IIUC, to pass TypeTag to CatalystSerde.generateObjAttr seems to require several code changes across files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in CatalystSerde.generateObjAttr we call deserializer.dataType, why can't we call deserializer.nullable?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As the deserializer is not resolved yet, there will be exception when accessing its nullable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we called deserializer.nullable at CatalystSerde.generateObjAttr with the following change in the calling context of ExternalRDD.apply(), the following error occurs. I think that ExternalRDD.apply() calls CatalystSerde.generateObjAttr at very early phase where a plan has not been resolved yet.

object CatalystSerde {
  def deserialize[T : Encoder](child: LogicalPlan): DeserializeToObject = {
    val deserializer = UnresolvedDeserializer(encoderFor[T].deserializer)
    DeserializeToObject(deserializer, generateObjAttr[T], child)
  }

  def serialize[T : Encoder](child: LogicalPlan): SerializeFromObject = {
    SerializeFromObject(encoderFor[T].namedExpressions, child)
  }

  def generateObjAttr[T : Encoder]: Attribute = {
    val deserializer = encoderFor[T].deserializer
    AttributeReference("obj", deserializer.dataType, deserializer.nullable)()
  }
}
Invalid call to nullable on unresolved object, tree: getcolumnbyordinal(0, IntegerType)
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to nullable on unresolved object, tree: getcolumnbyordinal(0, IntegerType)
	at org.apache.spark.sql.catalyst.analysis.GetColumnByOrdinal.nullable(unresolved.scala:399)
	at org.apache.spark.sql.catalyst.expressions.UnaryExpression.nullable(Expression.scala:314)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike$$anonfun$needNullCheck$1.apply(objects.scala:44)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike$$anonfun$needNullCheck$1.apply(objects.scala:44)
	at scala.collection.LinearSeqOptimized$class.exists(LinearSeqOptimized.scala:93)
	at scala.collection.immutable.List.exists(List.scala:84)
	at org.apache.spark.sql.catalyst.expressions.objects.InvokeLike$class.needNullCheck(objects.scala:44)
	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.needNullCheck$lzycompute(objects.scala:290)
	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.needNullCheck(objects.scala:290)
	at org.apache.spark.sql.catalyst.expressions.objects.NewInstance.nullable(objects.scala:298)
	at org.apache.spark.sql.catalyst.plans.logical.CatalystSerde$.generateObjAttr(object.scala:45)
	at org.apache.spark.sql.execution.ExternalRDD$.apply(ExistingRDD.scala:76)
	at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:471)
	at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:393)
	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:238)
	at org.apache.spark.sql.DataFrameImplicitsSuite$$anonfun$6.apply$mcV$sp(DataFrameImplicitsSuite.scala:56)
	at org.apache.spark.sql.DataFrameImplicitsSuite$$anonfun$6.apply(DataFrameImplicitsSuite.scala:55)
	at org.apache.spark.sql.DataFrameImplicitsSuite$$anonfun$6.apply(DataFrameImplicitsSuite.scala:55)
	at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
	at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:166)
	at org.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:68)
	at org.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:163)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:175)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:175)
	at org.apache.spark.sql.DataFrameImplicitsSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(DataFrameImplicitsSuite.scala:22)
	at org.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:255)
	at org.apache.spark.sql.DataFrameImplicitsSuite.runTest(DataFrameImplicitsSuite.scala:22)
...

Here is an example program with plans.

val dfInt = sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF
dfInt.explain(true)
assert(dfInt.collect === Array(Row(0), Row(null), Row(2)))

== Parsed Logical Plan ==
SerializeFromObject [input[0, java.lang.Integer, true].intValue AS value#2]
+- ExternalRDD [obj#1]

== Analyzed Logical Plan ==
value: int
SerializeFromObject [input[0, java.lang.Integer, true].intValue AS value#2]
+- ExternalRDD [obj#1]

== Optimized Logical Plan ==
SerializeFromObject [input[0, java.lang.Integer, true].intValue AS value#2]
+- ExternalRDD [obj#1]

== Physical Plan ==
*SerializeFromObject [input[0, java.lang.Integer, true].intValue AS value#2]
+- Scan ExternalRDDScan[obj#1]

// Since ExpressionEncoder[T].deserializer is not resolved here,
// cannot access ExpressionEncoder[T].deserializer.nullable.
// We infer nullability from DataType
attr.dataType match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this logic to CatalystSerde.generateObjAttr?

Copy link
Member Author

@kiszk kiszk Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I will do this by executing this inference only for unresolved deserializer.

case BooleanType => attr
case _: IntegralType => attr
case FloatType | DoubleType => attr
case DecimalType.Fixed(p, s) if p <= Decimal.MAX_LONG_DIGITS => attr
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think decimal type is always nullable.


package org.apache.spark.sql.execution

import scala.reflect.runtime.universe.TypeTag
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this and below change should revert back.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74973 has finished for PR 17302 at commit e81b0cb.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74974 has finished for PR 17302 at commit 6e3e44d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AttributeReference("obj", encoderFor[T].deserializer.dataType, nullable = false)()
val deserializer = encoderFor[T].deserializer
val dataType = deserializer.dataType
val nullable = if (deserializer.childrenResolved) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the encoder returned by encoderFor[T] should be unresolved initially. Is it possible we go this path?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. When this method is executed in other call context, deserializer has been already resolved. The following is one of examples:

	at org.apache.spark.sql.catalyst.plans.logical.CatalystSerde$.generateObjAttr(object.scala:47)
	at org.apache.spark.sql.catalyst.plans.logical.CatalystSerde$.deserialize(object.scala:36)
	at org.apache.spark.sql.catalyst.encoders.ExpressionEncoder.resolveAndBind(ExpressionEncoder.scala:254)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:207)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:68)
	at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2831)
	at org.apache.spark.sql.Dataset.createOrReplaceTempView(Dataset.scala:2612)
	at org.apache.spark.sql.test.SQLTestData$class.testData(SQLTestData.scala:54)
	at org.apache.spark.sql.DataFrameSuite.testData$lzycompute(DataFrameSuite.scala:43)
	at org.apache.spark.sql.DataFrameSuite.testData(DataFrameSuite.scala:43)
...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How we know it is resolved in above context? In encoderFor, it calls e.assertUnresolved to make sure the returned encoder is unresolved.

Copy link
Member Author

@kiszk kiszk Mar 21, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your review. I noticed my mistake. I should use val nullable = if (deserializer.resolved) {. Even when I made this correction, deserializer is resolved.

In the above context, it is ensured that encoderFor[T].deserializer is resolved in the assertion require(inputAttributes.forall(_.resolved), "Input attributes must all be resolved.") at case class UnresolvedDeserializer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the assertion in UnresolvedDeserializer doesn't mean encoderFor[T].deserializer is resolved. The resolved are the inputAttributes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for the deserializer doesn't refer any attribute, otherwise a resolved deserializer can't pass the check, I think.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you show me the test code which will hit this path, i.e., deserializer.resolved is true?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my environment, this test code returns true in deserializer.resolved.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I checked the test code.

The case when deserializer.resolved is true, is a CreateExternalRow with no children. (because it refers to nonExistentName). I don't think this is a normal use case.

Copy link
Member

@viirya viirya Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Except for the deserializer doesn't refer any attribute, otherwise a resolved deserializer can't pass the check, I think.

The only possible that encoderFor[T].deserializer.resolved is true, is the deserializer doesn't refer anything. I think it is meaningless and abnormal query like the test case.

If I don't miss anything, the value of returned nullable shouldn't matter for this kind of cases.

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74988 has finished for PR 17302 at commit d7d0a36.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 21, 2017

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Mar 21, 2017

Test build #74992 has finished for PR 17302 at commit d7d0a36.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


test("SPARK-19959: df[java.lang.Long].collect includes null throws NullPointerException") {
val dfInt = sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF
assert(dfInt.collect === Array(Row(0), Row(null), Row(2)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use checkAnswer?

    checkAnswer(
      sparkContext.parallelize(Seq[java.lang.Integer](0, null, 2), 1).toDF,
      Seq(Row(0), Row(null), Row(2)))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done

val nullable = if (deserializer.resolved) {
deserializer.nullable
} else {
// Since deserializer is not resolved here, cannot access deserializer.nullable.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to be safe, we should just return true here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is too conservative since it always returned false.

If we just return true, the following code has a call to isNullAt() and its check to get a long value at lines 30 and 35 while its type is primitive.

sparkContext.parallelize(Seq(0L, 1L, 2L), 1).toDF.collect
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 031 */       long inputadapter_value = inputadapter_isNull ? -1L : (inputadapter_row.getLong(0));
/* 032 */
/* 033 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 034 */
/* 035 */       if (inputadapter_isNull) {
/* 036 */         serializefromobject_rowWriter.setNullAt(0);
/* 037 */       } else {
/* 038 */         serializefromobject_rowWriter.write(0, inputadapter_value);
/* 039 */       }
/* 040 */       append(serializefromobject_result);
/* 041 */       if (shouldStop()) return;
/* 042 */     }
/* 043 */   }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will go to the if (deserializer.resolved) branch, isn't it?

Copy link
Member Author

@kiszk kiszk Mar 22, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conservative code generation happens when I change as follows:

val nullable = if (deserializer.resolved) {
   deserializer.nullable
} else {
  // dataType match {
  //  case BooleanType | FloatType | DoubleType => false
  //  case _: IntegralType => false
  //  case _ => true
  // }
  true
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm, how could this happen? when will deserializer be resolved?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens in the following calling context thru ExternalRDD.apply.

	at org.apache.spark.sql.catalyst.plans.logical.CatalystSerde$.generateObjAttr(object.scala:61)
	at org.apache.spark.sql.execution.ExternalRDD$.apply(ExistingRDD.scala:73)
	at org.apache.spark.sql.SparkSession.createDataset(SparkSession.scala:471)
	at org.apache.spark.sql.SQLContext.createDataset(SQLContext.scala:393)
	at org.apache.spark.sql.SQLImplicits.rddToDatasetHolder(SQLImplicits.scala:238)
	at org.apache.spark.sql.Agg$$anonfun$18$$anonfun$apply$mcV$sp$4.apply$mcV$sp(MySuite.scala:202)
	at org.apache.spark.sql.test.SQLTestUtils$class.withSQLConf(SQLTestUtils.scala:106)
	at org.apache.spark.sql.Agg.withSQLConf(MySuite.scala:168)
	at org.apache.spark.sql.Agg$$anonfun$18.apply$mcV$sp(MySuite.scala:193)
	at org.apache.spark.sql.Agg$$anonfun$18.apply(MySuite.scala:193)
	at org.apache.spark.sql.Agg$$anonfun$18.apply(MySuite.scala:193)

Here is a thread regarding resolution of deserializer.

@SparkQA
Copy link

SparkQA commented Mar 22, 2017

Test build #75036 has finished for PR 17302 at commit 0a455be.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AttributeReference("obj", encoderFor[T].deserializer.dataType, nullable = false)()
val deserializer = encoderFor[T].deserializer
val dataType = deserializer.dataType
val nullable = if (deserializer.resolved) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK how about this: encoder.clsTag.runtimeClass.isPrimitive

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, to use val nullable = encoder.clsTag.runtimeClass.isPrimitive causes NullPointerException since Class[java.lang.Long].isPrimitive returns true.
We want to get false for Class[java.lang.Long] and get true for (LongType, false).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scala> classOf[java.lang.Long].isPrimitive
res1: Boolean = false

I guess some information is lost when we get class from ClassTag.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me double-check it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Sorry, I noticed that I made a mistake. The new code should be val nullable = !encoder.clsTag.runtimeClass.isPrimitive. It worked well.
I will commit this code.

val enc = encoderFor[T]
val dataType = enc.deserializer.dataType
val nullable = !enc.clsTag.runtimeClass.isPrimitive
AttributeReference("obj", dataType, nullable = nullable)()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: AttributeReference("obj", dataType, nullable)()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks, done

@gatorsmile
Copy link
Member

LGTM pending Jenkins

@gatorsmile
Copy link
Member

Could you update the PR description about the generated codes by using the latest code change?

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75099 has finished for PR 17302 at commit 299fd68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kiszk
Copy link
Member Author

kiszk commented Mar 23, 2017

Absolutely, updated.

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75103 has finished for PR 17302 at commit e1fee6e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

AttributeReference("obj", encoderFor[T].deserializer.dataType, nullable = false)()
val enc = encoderFor[T]
val dataType = enc.deserializer.dataType
val nullable = !enc.clsTag.runtimeClass.isPrimitive
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good now.

@viirya
Copy link
Member

viirya commented Mar 23, 2017

LGTM

asfgit pushed a commit that referenced this pull request Mar 24, 2017
…Long].collect

## What changes were proposed in this pull request?

This PR fixes `NullPointerException` in the generated code by Catalyst. When we run the following code, we get the following `NullPointerException`. This is because there is no null checks for `inputadapter_value`  while `java.lang.Long inputadapter_value` at Line 30 may have `null`.

This happen when a type of DataFrame is nullable primitive type such as `java.lang.Long` and the wholestage codegen is used. While the physical plan keeps `nullable=true` in `input[0, java.lang.Long, true].longValue`, `BoundReference.doGenCode` ignores `nullable=true`. Thus, nullcheck code will not be generated and `NullPointerException` will occur.

This PR checks the nullability and correctly generates nullcheck if needed.
```java
sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF.collect
```

```java
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:37)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:393)
...
```

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       java.lang.Long inputadapter_value = (java.lang.Long)inputadapter_row.get(0, null);
/* 031 */
/* 032 */       boolean serializefromobject_isNull = true;
/* 033 */       long serializefromobject_value = -1L;
/* 034 */       if (!false) {
/* 035 */         serializefromobject_isNull = false;
/* 036 */         if (!serializefromobject_isNull) {
/* 037 */           serializefromobject_value = inputadapter_value.longValue();
/* 038 */         }
/* 039 */
/* 040 */       }
/* 041 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 042 */
/* 043 */       if (serializefromobject_isNull) {
/* 044 */         serializefromobject_rowWriter.setNullAt(0);
/* 045 */       } else {
/* 046 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 047 */       }
/* 048 */       append(serializefromobject_result);
/* 049 */       if (shouldStop()) return;
/* 050 */     }
/* 051 */   }
/* 052 */ }
```

Generated code with this PR

```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 031 */       java.lang.Long inputadapter_value = inputadapter_isNull ? null : ((java.lang.Long)inputadapter_row.get(0, null));
/* 032 */
/* 033 */       boolean serializefromobject_isNull = true;
/* 034 */       long serializefromobject_value = -1L;
/* 035 */       if (!inputadapter_isNull) {
/* 036 */         serializefromobject_isNull = false;
/* 037 */         if (!serializefromobject_isNull) {
/* 038 */           serializefromobject_value = inputadapter_value.longValue();
/* 039 */         }
/* 040 */
/* 041 */       }
/* 042 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 043 */
/* 044 */       if (serializefromobject_isNull) {
/* 045 */         serializefromobject_rowWriter.setNullAt(0);
/* 046 */       } else {
/* 047 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 048 */       }
/* 049 */       append(serializefromobject_result);
/* 050 */       if (shouldStop()) return;
/* 051 */     }
/* 052 */   }
/* 053 */ }
```

## How was this patch tested?

Added new test suites in `DataFrameSuites`

Author: Kazuaki Ishizaki <[email protected]>

Closes #17302 from kiszk/SPARK-19959.

(cherry picked from commit bb823ca)
Signed-off-by: Wenchen Fan <[email protected]>
asfgit pushed a commit that referenced this pull request Mar 24, 2017
…Long].collect

## What changes were proposed in this pull request?

This PR fixes `NullPointerException` in the generated code by Catalyst. When we run the following code, we get the following `NullPointerException`. This is because there is no null checks for `inputadapter_value`  while `java.lang.Long inputadapter_value` at Line 30 may have `null`.

This happen when a type of DataFrame is nullable primitive type such as `java.lang.Long` and the wholestage codegen is used. While the physical plan keeps `nullable=true` in `input[0, java.lang.Long, true].longValue`, `BoundReference.doGenCode` ignores `nullable=true`. Thus, nullcheck code will not be generated and `NullPointerException` will occur.

This PR checks the nullability and correctly generates nullcheck if needed.
```java
sparkContext.parallelize(Seq[java.lang.Long](0L, null, 2L), 1).toDF.collect
```

```java
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:37)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:393)
...
```

Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       java.lang.Long inputadapter_value = (java.lang.Long)inputadapter_row.get(0, null);
/* 031 */
/* 032 */       boolean serializefromobject_isNull = true;
/* 033 */       long serializefromobject_value = -1L;
/* 034 */       if (!false) {
/* 035 */         serializefromobject_isNull = false;
/* 036 */         if (!serializefromobject_isNull) {
/* 037 */           serializefromobject_value = inputadapter_value.longValue();
/* 038 */         }
/* 039 */
/* 040 */       }
/* 041 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 042 */
/* 043 */       if (serializefromobject_isNull) {
/* 044 */         serializefromobject_rowWriter.setNullAt(0);
/* 045 */       } else {
/* 046 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 047 */       }
/* 048 */       append(serializefromobject_result);
/* 049 */       if (shouldStop()) return;
/* 050 */     }
/* 051 */   }
/* 052 */ }
```

Generated code with this PR

```java
/* 005 */ final class GeneratedIterator extends org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */   private Object[] references;
/* 007 */   private scala.collection.Iterator[] inputs;
/* 008 */   private scala.collection.Iterator inputadapter_input;
/* 009 */   private UnsafeRow serializefromobject_result;
/* 010 */   private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder serializefromobject_holder;
/* 011 */   private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter serializefromobject_rowWriter;
/* 012 */
/* 013 */   public GeneratedIterator(Object[] references) {
/* 014 */     this.references = references;
/* 015 */   }
/* 016 */
/* 017 */   public void init(int index, scala.collection.Iterator[] inputs) {
/* 018 */     partitionIndex = index;
/* 019 */     this.inputs = inputs;
/* 020 */     inputadapter_input = inputs[0];
/* 021 */     serializefromobject_result = new UnsafeRow(1);
/* 022 */     this.serializefromobject_holder = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result, 0);
/* 023 */     this.serializefromobject_rowWriter = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder, 1);
/* 024 */
/* 025 */   }
/* 026 */
/* 027 */   protected void processNext() throws java.io.IOException {
/* 028 */     while (inputadapter_input.hasNext() && !stopEarly()) {
/* 029 */       InternalRow inputadapter_row = (InternalRow) inputadapter_input.next();
/* 030 */       boolean inputadapter_isNull = inputadapter_row.isNullAt(0);
/* 031 */       java.lang.Long inputadapter_value = inputadapter_isNull ? null : ((java.lang.Long)inputadapter_row.get(0, null));
/* 032 */
/* 033 */       boolean serializefromobject_isNull = true;
/* 034 */       long serializefromobject_value = -1L;
/* 035 */       if (!inputadapter_isNull) {
/* 036 */         serializefromobject_isNull = false;
/* 037 */         if (!serializefromobject_isNull) {
/* 038 */           serializefromobject_value = inputadapter_value.longValue();
/* 039 */         }
/* 040 */
/* 041 */       }
/* 042 */       serializefromobject_rowWriter.zeroOutNullBytes();
/* 043 */
/* 044 */       if (serializefromobject_isNull) {
/* 045 */         serializefromobject_rowWriter.setNullAt(0);
/* 046 */       } else {
/* 047 */         serializefromobject_rowWriter.write(0, serializefromobject_value);
/* 048 */       }
/* 049 */       append(serializefromobject_result);
/* 050 */       if (shouldStop()) return;
/* 051 */     }
/* 052 */   }
/* 053 */ }
```

## How was this patch tested?

Added new test suites in `DataFrameSuites`

Author: Kazuaki Ishizaki <[email protected]>

Closes #17302 from kiszk/SPARK-19959.

(cherry picked from commit bb823ca)
Signed-off-by: Wenchen Fan <[email protected]>
@cloud-fan
Copy link
Contributor

thanks, merging to master/2.1/2.0!

@asfgit asfgit closed this in bb823ca Mar 24, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants