Skip to content

Conversation

@marmbrus
Copy link
Contributor

@marmbrus marmbrus commented Sep 8, 2014

No description provided.

@marmbrus
Copy link
Contributor Author

marmbrus commented Sep 8, 2014

/cc @yhuai

@SparkQA
Copy link

SparkQA commented Sep 8, 2014

QA tests have started for PR 2323 at commit 59065bc.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 9, 2014

QA tests have finished for PR 2323 at commit 59065bc.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@yhuai
Copy link
Contributor

yhuai commented Sep 9, 2014

JsonRDD and the Java API of Row are also using wrappers. Should we also check if these places will also trigger the NPE?

@marmbrus
Copy link
Contributor Author

I've updated the usage in JSON RDD. Java Row wrapping should never happen before Kryo serialization AFAICT.

@marmbrus
Copy link
Contributor Author

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2323 at commit 646976b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

Tests timed out after a configured wait of 120m.

@marmbrus
Copy link
Contributor Author

Jenkins, test this please.

1 similar comment
@marmbrus
Copy link
Contributor Author

Jenkins, test this please.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have started for PR 2323 at commit 646976b.

  • This patch merges cleanly.

@SparkQA
Copy link

SparkQA commented Sep 10, 2014

QA tests have finished for PR 2323 at commit 646976b.

  • This patch passes unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, after checking the code again, I think .map(scalafy) will convert the JListWrapper at here to an ArrayBuffer (JListWrapper is a Buffer and Buffer's newBuilder returns ArrayBuffer) and we will not have the Kryo issue. I tried from pyspark.sql import SQLContext;SQLContext(sc).jsonRDD(sc.parallelize(['{"a":[3]}']))._jschema_rdd.collect() and it's fine.

@marmbrus
Copy link
Contributor Author

Okay, I reverted the JSON rdd changed and merged this to master. Thanks!

@asfgit asfgit closed this in f92cde2 Sep 11, 2014
@mohangadm
Copy link

I have experienced the same kind of problem when using Avro with spark streaming API.
If avro message is simple, its fine. but if the avro message has Union/Arrays its failing with the exception Below:
ERROR scheduler.JobScheduler: Error running job streaming job 1411043845000 ms.0
org.apache.spark.SparkException: Job aborted due to stage failure: Exception while getting task result: com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
value (xyz.Datum)
data (xyz.ResourceMessage)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688)
at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Above exception shows up when used output operations.

below is the avro message.
{"version": "01", "sequence": "00001", "resource": "sensor-001", "controller": "002", "controllerTimestamp": "1411038710358", "data": {"value": [{"name": "Temperature", "value": "30"}, {"name": "Speed", "value": "60"}, {"name": "Location", "value": ["+401213.1", "-0750015.1"]}, {"name": "Timestamp", "value": "2014-09-09T08:15:25-05:00"}]}}

message is been successfully decoded in decoder, but throws exception for output operation.

@marmbrus marmbrus deleted the kryoJListNPE branch September 22, 2014 19:51
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants