From 4e67ab96027f5de942c25eb14669d5aedae80204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?daniedeng=28=E9=82=93=E9=A3=9E=29?= Date: Wed, 17 Jan 2018 20:32:10 +0800 Subject: [PATCH 1/2] Read eventLog file with mixed encodings --- .../org/apache/spark/scheduler/ReplayListenerBus.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index c9cd662f5709d..14d7750d3fc4f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -18,8 +18,9 @@ package org.apache.spark.scheduler import java.io.{EOFException, InputStream, IOException} +import java.nio.charset.CodingErrorAction -import scala.io.Source +import scala.io.{Codec, Source} import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException @@ -54,6 +55,9 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { sourceName: String, maybeTruncated: Boolean = false, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { + implicit val codec = Codec.default + codec.onMalformedInput(CodingErrorAction.REPLACE) + codec.onUnmappableCharacter(CodingErrorAction.REPLACE) val lines = Source.fromInputStream(logData).getLines() replay(lines, sourceName, maybeTruncated, eventsFilter) } From f7c75ec0a03283f66a6d7b11070976a99ccae42f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?daniedeng=28=E9=82=93=E9=A3=9E=29?= Date: Wed, 7 Feb 2018 11:35:11 +0800 Subject: [PATCH 2/2] add UT --- .../spark/scheduler/ReplayListenerSuite.scala | 28 ++++++++++++++++--- data/events/mixed-encoding-events.txt | 2 ++ 2 files changed, 26 insertions(+), 4 deletions(-) create mode 100644 data/events/mixed-encoding-events.txt diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index 73e7b3fe8c1de..f864b47bdfdf1 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -22,13 +22,15 @@ import java.net.URI import java.util.concurrent.atomic.AtomicInteger import org.apache.hadoop.fs.Path -import org.json4s.jackson.JsonMethods._ -import org.scalatest.BeforeAndAfter - -import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.io.{CompressionCodec, LZ4CompressionCodec} import org.apache.spark.util.{JsonProtocol, JsonProtocolSuite, Utils} +import org.json4s.jackson.JsonMethods._ +import org.scalatest.BeforeAndAfter + +import scala.compat.Platform.EOL +import scala.io.Codec /** * Test whether ReplayListenerBus replays events from logs correctly. @@ -74,6 +76,24 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd)) } + /** + * Test the events which encoding by GBK(Chinese charset), but read by default(UTF-8) + */ + test("Read Mixed encoding log") { + val logFilePath = new Path("../data/events/mixed-encoding-events.txt") + val conf = EventLoggingListenerSuite.getLoggingConf(logFilePath) + val logData = fileSystem.open(logFilePath) + val eventMonster = new EventMonster(conf) + try { + val replayer = new ReplayListenerBus() + replayer.addListener(eventMonster) + replayer.replay(logData, logFilePath.toString) + } finally { + logData.close() + } + assert(eventMonster.loggedEvents.length==2) + } + /** * Test replaying compressed spark history file that internally throws an EOFException. To * avoid sensitivity to the compression specifics the test forces an EOFException to occur diff --git a/data/events/mixed-encoding-events.txt b/data/events/mixed-encoding-events.txt new file mode 100644 index 0000000000000..6708e1990ee1d --- /dev/null +++ b/data/events/mixed-encoding-events.txt @@ -0,0 +1,2 @@ +{"Event":"SparkListenerTaskStart","Stage ID":0,"Stage Attempt ID":0,"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1517917027379,"Executor ID":"0","Host":"localhost","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}} +{"Event":"SparkListenerTaskEnd","Stage ID":0,"Stage Attempt ID":0,"Task Type":"result","Task End Reason":{"Reason":"ExceptionFailure","Class Name":"IOException","Description":"ooO,ÄãºÃ","Stack Trace":[{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite$$anonfun$3","Method Name":"apply$mcV$sp","File Name":"ReplayListenerSuite.scala","Line Number":91},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite$$anonfun$3","Method Name":"apply","File Name":"ReplayListenerSuite.scala","Line Number":79},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite$$anonfun$3","Method Name":"apply","File Name":"ReplayListenerSuite.scala","Line Number":79},{"Declaring Class":"org.scalatest.OutcomeOf$class","Method Name":"outcomeOf","File Name":"OutcomeOf.scala","Line Number":85},{"Declaring Class":"org.scalatest.OutcomeOf$","Method Name":"outcomeOf","File Name":"OutcomeOf.scala","Line Number":104},{"Declaring Class":"org.scalatest.Transformer","Method Name":"apply","File Name":"Transformer.scala","Line Number":22},{"Declaring Class":"org.scalatest.Transformer","Method Name":"apply","File Name":"Transformer.scala","Line Number":20},{"Declaring Class":"org.scalatest.FunSuiteLike$$anon$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":186},{"Declaring Class":"org.apache.spark.SparkFunSuite","Method Name":"withFixture","File Name":"SparkFunSuite.scala","Line Number":102},{"Declaring Class":"org.scalatest.FunSuiteLike$class","Method Name":"invokeWithFixture$1","File Name":"FunSuiteLike.scala","Line Number":183},{"Declaring Class":"org.scalatest.FunSuiteLike$$anonfun$runTest$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":196},{"Declaring Class":"org.scalatest.FunSuiteLike$$anonfun$runTest$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":196},{"Declaring Class":"org.scalatest.SuperEngine","Method Name":"runTestImpl","File Name":"Engine.scala","Line Number":289},{"Declaring Class":"org.scalatest.FunSuiteLike$class","Method Name":"runTest","File Name":"FunSuiteLike.scala","Line Number":196},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite","Method Name":"org$scalatest$BeforeAndAfter$$super$runTest","File Name":"ReplayListenerSuite.scala","Line Number":38},{"Declaring Class":"org.scalatest.BeforeAndAfter$class","Method Name":"runTest","File Name":"BeforeAndAfter.scala","Line Number":203},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite","Method Name":"org$scalatest$BeforeAndAfterEach$$super$runTest","File Name":"ReplayListenerSuite.scala","Line Number":38},{"Declaring Class":"org.scalatest.BeforeAndAfterEach$class","Method Name":"runTest","File Name":"BeforeAndAfterEach.scala","Line Number":221},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite","Method Name":"runTest","File Name":"ReplayListenerSuite.scala","Line Number":38},{"Declaring Class":"org.scalatest.FunSuiteLike$$anonfun$runTests$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":229},{"Declaring Class":"org.scalatest.FunSuiteLike$$anonfun$runTests$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":229},{"Declaring Class":"org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1","Method Name":"apply","File Name":"Engine.scala","Line Number":396},{"Declaring Class":"org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1","Method Name":"apply","File Name":"Engine.scala","Line Number":384},{"Declaring Class":"scala.collection.immutable.List","Method Name":"foreach","File Name":"List.scala","Line Number":381},{"Declaring Class":"org.scalatest.SuperEngine","Method Name":"traverseSubNodes$1","File Name":"Engine.scala","Line Number":384},{"Declaring Class":"org.scalatest.SuperEngine","Method Name":"org$scalatest$SuperEngine$$runTestsInBranch","File Name":"Engine.scala","Line Number":379},{"Declaring Class":"org.scalatest.SuperEngine","Method Name":"runTestsImpl","File Name":"Engine.scala","Line Number":461},{"Declaring Class":"org.scalatest.FunSuiteLike$class","Method Name":"runTests","File Name":"FunSuiteLike.scala","Line Number":229},{"Declaring Class":"org.scalatest.FunSuite","Method Name":"runTests","File Name":"FunSuite.scala","Line Number":1560},{"Declaring Class":"org.scalatest.Suite$class","Method Name":"run","File Name":"Suite.scala","Line Number":1147},{"Declaring Class":"org.scalatest.FunSuite","Method Name":"org$scalatest$FunSuiteLike$$super$run","File Name":"FunSuite.scala","Line Number":1560},{"Declaring Class":"org.scalatest.FunSuiteLike$$anonfun$run$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":233},{"Declaring Class":"org.scalatest.FunSuiteLike$$anonfun$run$1","Method Name":"apply","File Name":"FunSuiteLike.scala","Line Number":233},{"Declaring Class":"org.scalatest.SuperEngine","Method Name":"runImpl","File Name":"Engine.scala","Line Number":521},{"Declaring Class":"org.scalatest.FunSuiteLike$class","Method Name":"run","File Name":"FunSuiteLike.scala","Line Number":233},{"Declaring Class":"org.apache.spark.SparkFunSuite","Method Name":"org$scalatest$BeforeAndAfterAll$$super$run","File Name":"SparkFunSuite.scala","Line Number":52},{"Declaring Class":"org.scalatest.BeforeAndAfterAll$class","Method Name":"liftedTree1$1","File Name":"BeforeAndAfterAll.scala","Line Number":213},{"Declaring Class":"org.scalatest.BeforeAndAfterAll$class","Method Name":"run","File Name":"BeforeAndAfterAll.scala","Line Number":210},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite","Method Name":"org$scalatest$BeforeAndAfter$$super$run","File Name":"ReplayListenerSuite.scala","Line Number":38},{"Declaring Class":"org.scalatest.BeforeAndAfter$class","Method Name":"run","File Name":"BeforeAndAfter.scala","Line Number":258},{"Declaring Class":"org.apache.spark.scheduler.ReplayListenerSuite","Method Name":"run","File Name":"ReplayListenerSuite.scala","Line Number":38},{"Declaring Class":"org.scalatest.tools.SuiteRunner","Method Name":"run","File Name":"SuiteRunner.scala","Line Number":45},{"Declaring Class":"org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1","Method Name":"apply","File Name":"Runner.scala","Line Number":1340},{"Declaring Class":"org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1","Method Name":"apply","File Name":"Runner.scala","Line Number":1334},{"Declaring Class":"scala.collection.immutable.List","Method Name":"foreach","File Name":"List.scala","Line Number":381},{"Declaring Class":"org.scalatest.tools.Runner$","Method Name":"doRunRunRunDaDoRunRun","File Name":"Runner.scala","Line Number":1334},{"Declaring Class":"org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2","Method Name":"apply","File Name":"Runner.scala","Line Number":1011},{"Declaring Class":"org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2","Method Name":"apply","File Name":"Runner.scala","Line Number":1010},{"Declaring Class":"org.scalatest.tools.Runner$","Method Name":"withClassLoaderAndDispatchReporter","File Name":"Runner.scala","Line Number":1500},{"Declaring Class":"org.scalatest.tools.Runner$","Method Name":"runOptionallyWithPassFailReporter","File Name":"Runner.scala","Line Number":1010},{"Declaring Class":"org.scalatest.tools.Runner$","Method Name":"run","File Name":"Runner.scala","Line Number":850},{"Declaring Class":"org.scalatest.tools.Runner","Method Name":"run","File Name":"Runner.scala","Line Number":-1},{"Declaring Class":"org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner","Method Name":"runScalaTest2","File Name":"ScalaTestRunner.java","Line Number":131},{"Declaring Class":"org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner","Method Name":"main","File Name":"ScalaTestRunner.java","Line Number":28}],"Full Stack Trace":"org.apache.spark.scheduler.ReplayListenerSuite$$anonfun$3.apply$mcV$sp(ReplayListenerSuite.scala:91)\norg.apache.spark.scheduler.ReplayListenerSuite$$anonfun$3.apply(ReplayListenerSuite.scala:79)\norg.apache.spark.scheduler.ReplayListenerSuite$$anonfun$3.apply(ReplayListenerSuite.scala:79)\norg.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)\norg.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)\norg.scalatest.Transformer.apply(Transformer.scala:22)\norg.scalatest.Transformer.apply(Transformer.scala:20)\norg.scalatest.FunSuiteLike$$anon$1.apply(FunSuiteLike.scala:186)\norg.apache.spark.SparkFunSuite.withFixture(SparkFunSuite.scala:102)\norg.scalatest.FunSuiteLike$class.invokeWithFixture$1(FunSuiteLike.scala:183)\norg.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)\norg.scalatest.FunSuiteLike$$anonfun$runTest$1.apply(FunSuiteLike.scala:196)\norg.scalatest.SuperEngine.runTestImpl(Engine.scala:289)\norg.scalatest.FunSuiteLike$class.runTest(FunSuiteLike.scala:196)\norg.apache.spark.scheduler.ReplayListenerSuite.org$scalatest$BeforeAndAfter$$super$runTest(ReplayListenerSuite.scala:38)\norg.scalatest.BeforeAndAfter$class.runTest(BeforeAndAfter.scala:203)\norg.apache.spark.scheduler.ReplayListenerSuite.org$scalatest$BeforeAndAfterEach$$super$runTest(ReplayListenerSuite.scala:38)\norg.scalatest.BeforeAndAfterEach$class.runTest(BeforeAndAfterEach.scala:221)\norg.apache.spark.scheduler.ReplayListenerSuite.runTest(ReplayListenerSuite.scala:38)\norg.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)\norg.scalatest.FunSuiteLike$$anonfun$runTests$1.apply(FunSuiteLike.scala:229)\norg.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:396)\norg.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:384)\nscala.collection.immutable.List.foreach(List.scala:381)\norg.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:384)\norg.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:379)\norg.scalatest.SuperEngine.runTestsImpl(Engine.scala:461)\norg.scalatest.FunSuiteLike$class.runTests(FunSuiteLike.scala:229)\norg.scalatest.FunSuite.runTests(FunSuite.scala:1560)\norg.scalatest.Suite$class.run(Suite.scala:1147)\norg.scalatest.FunSuite.org$scalatest$FunSuiteLike$$super$run(FunSuite.scala:1560)\norg.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)\norg.scalatest.FunSuiteLike$$anonfun$run$1.apply(FunSuiteLike.scala:233)\norg.scalatest.SuperEngine.runImpl(Engine.scala:521)\norg.scalatest.FunSuiteLike$class.run(FunSuiteLike.scala:233)\norg.apache.spark.SparkFunSuite.org$scalatest$BeforeAndAfterAll$$super$run(SparkFunSuite.scala:52)\norg.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:213)\norg.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:210)\norg.apache.spark.scheduler.ReplayListenerSuite.org$scalatest$BeforeAndAfter$$super$run(ReplayListenerSuite.scala:38)\norg.scalatest.BeforeAndAfter$class.run(BeforeAndAfter.scala:258)\norg.apache.spark.scheduler.ReplayListenerSuite.run(ReplayListenerSuite.scala:38)\norg.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:45)\norg.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1340)\norg.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$1.apply(Runner.scala:1334)\nscala.collection.immutable.List.foreach(List.scala:381)\norg.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1334)\norg.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1011)\norg.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1010)\norg.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1500)\norg.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1010)\norg.scalatest.tools.Runner$.run(Runner.scala:850)\norg.scalatest.tools.Runner.run(Runner.scala)\norg.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:131)\norg.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)\n","Accumulator Updates":[]},"Task Info":{"Task ID":0,"Index":0,"Attempt":0,"Launch Time":1517917027379,"Executor ID":"0","Host":"localhost","Locality":"PROCESS_LOCAL","Speculative":false,"Getting Result Time":0,"Finish Time":0,"Failed":false,"Killed":false,"Accumulables":[]}}