diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index bee784dcdfe33..3e8318e1921cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -131,7 +131,7 @@ private[spark] class EventLoggingListener( EventLoggingListener.initEventLog(bstream, testing, loggedEvents) fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - writer = Some(new PrintWriter(bstream)) + writer = Some(new PrintWriter(new OutputStreamWriter(bstream, StandardCharsets.UTF_8))) logInfo("Logging events to %s".format(logPath)) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 226c23733c870..699042dd967bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.io.{EOFException, InputStream, IOException} -import scala.io.Source +import scala.io.{Codec, Source} import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException @@ -54,7 +54,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { sourceName: String, maybeTruncated: Boolean = false, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { - val lines = Source.fromInputStream(logData).getLines() + val lines = Source.fromInputStream(logData)(Codec.UTF8).getLines() replay(lines, sourceName, maybeTruncated, eventsFilter) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d65b5cbfc094e..cda537159a573 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI +import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -52,10 +53,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Simple replay") { val logFilePath = getFilePath(testDir, "events.txt") val fstream = fileSystem.create(logFilePath) + val fwriter = new OutputStreamWriter(fstream, StandardCharsets.UTF_8) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) - Utils.tryWithResource(new PrintWriter(fstream)) { writer => + Utils.tryWithResource(new PrintWriter(fwriter)) { writer => // scalastyle:off println writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) @@ -88,7 +90,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val buffered = new ByteArrayOutputStream val codec = new LZ4CompressionCodec(new SparkConf()) val compstream = codec.compressedOutputStream(buffered) - Utils.tryWithResource(new PrintWriter(compstream)) { writer => + val cwriter = new OutputStreamWriter(compstream, StandardCharsets.UTF_8) + Utils.tryWithResource(new PrintWriter(cwriter)) { writer => val applicationStart = SparkListenerApplicationStart("AppStarts", None, 125L, "Mickey", None) @@ -134,10 +137,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Replay incompatible event log") { val logFilePath = getFilePath(testDir, "incompatible.txt") val fstream = fileSystem.create(logFilePath) + val fwriter = new OutputStreamWriter(fstream, StandardCharsets.UTF_8) val applicationStart = SparkListenerApplicationStart("Incompatible App", None, 125L, "UserUsingIncompatibleVersion", None) val applicationEnd = SparkListenerApplicationEnd(1000L) - Utils.tryWithResource(new PrintWriter(fstream)) { writer => + Utils.tryWithResource(new PrintWriter(fwriter)) { writer => // scalastyle:off println writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""") diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 1c47dfb27124c..2d4d91dab075e 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -30,3 +30,4 @@ license: | - In Spark 3.0, deprecated method `AccumulableInfo.apply` have been removed because creating `AccumulableInfo` is disallowed. +- In Spark 3.0, event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark writes event log file as default charset of driver JVM process, so Spark History Server of Spark 2.x is needed to read the old event log files in case of incompatible encoding. \ No newline at end of file