From 71bf026586c81880941b31b4a771c2178564dfd2 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 19 Sep 2019 13:24:44 +0900 Subject: [PATCH 1/4] [SPARK-29160][CORE] Use UTF-8 specifically for reading/writing event log file --- .../apache/spark/scheduler/EventLoggingListener.scala | 2 +- .../org/apache/spark/scheduler/ReplayListenerBus.scala | 3 ++- .../apache/spark/scheduler/ReplayListenerSuite.scala | 10 +++++++--- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index bee784dcdfe33..3e8318e1921cb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -131,7 +131,7 @@ private[spark] class EventLoggingListener( EventLoggingListener.initEventLog(bstream, testing, loggedEvents) fileSystem.setPermission(path, LOG_FILE_PERMISSIONS) - writer = Some(new PrintWriter(bstream)) + writer = Some(new PrintWriter(new OutputStreamWriter(bstream, StandardCharsets.UTF_8))) logInfo("Logging events to %s".format(logPath)) } catch { case e: Exception => diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index 226c23733c870..cddd25730e13a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -18,6 +18,7 @@ package org.apache.spark.scheduler import java.io.{EOFException, InputStream, IOException} +import java.nio.charset.StandardCharsets import scala.io.Source @@ -54,7 +55,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { sourceName: String, maybeTruncated: Boolean = false, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { - val lines = Source.fromInputStream(logData).getLines() + val lines = Source.fromInputStream(logData, StandardCharsets.UTF_8.name()).getLines() replay(lines, sourceName, maybeTruncated, eventsFilter) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala index d65b5cbfc094e..cda537159a573 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.scheduler import java.io._ import java.net.URI +import java.nio.charset.StandardCharsets import java.util.concurrent.atomic.AtomicInteger import scala.collection.mutable.ArrayBuffer @@ -52,10 +53,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Simple replay") { val logFilePath = getFilePath(testDir, "events.txt") val fstream = fileSystem.create(logFilePath) + val fwriter = new OutputStreamWriter(fstream, StandardCharsets.UTF_8) val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None, 125L, "Mickey", None) val applicationEnd = SparkListenerApplicationEnd(1000L) - Utils.tryWithResource(new PrintWriter(fstream)) { writer => + Utils.tryWithResource(new PrintWriter(fwriter)) { writer => // scalastyle:off println writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd)))) @@ -88,7 +90,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp val buffered = new ByteArrayOutputStream val codec = new LZ4CompressionCodec(new SparkConf()) val compstream = codec.compressedOutputStream(buffered) - Utils.tryWithResource(new PrintWriter(compstream)) { writer => + val cwriter = new OutputStreamWriter(compstream, StandardCharsets.UTF_8) + Utils.tryWithResource(new PrintWriter(cwriter)) { writer => val applicationStart = SparkListenerApplicationStart("AppStarts", None, 125L, "Mickey", None) @@ -134,10 +137,11 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp test("Replay incompatible event log") { val logFilePath = getFilePath(testDir, "incompatible.txt") val fstream = fileSystem.create(logFilePath) + val fwriter = new OutputStreamWriter(fstream, StandardCharsets.UTF_8) val applicationStart = SparkListenerApplicationStart("Incompatible App", None, 125L, "UserUsingIncompatibleVersion", None) val applicationEnd = SparkListenerApplicationEnd(1000L) - Utils.tryWithResource(new PrintWriter(fstream)) { writer => + Utils.tryWithResource(new PrintWriter(fwriter)) { writer => // scalastyle:off println writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart)))) writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""") From caad54d3aa05e30762c11060ea71184a1264236c Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Thu, 19 Sep 2019 21:34:58 +0900 Subject: [PATCH 2/4] Address comment --- .../scala/org/apache/spark/scheduler/ReplayListenerBus.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index cddd25730e13a..699042dd967bc 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -18,9 +18,8 @@ package org.apache.spark.scheduler import java.io.{EOFException, InputStream, IOException} -import java.nio.charset.StandardCharsets -import scala.io.Source +import scala.io.{Codec, Source} import com.fasterxml.jackson.core.JsonParseException import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException @@ -55,7 +54,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging { sourceName: String, maybeTruncated: Boolean = false, eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = { - val lines = Source.fromInputStream(logData, StandardCharsets.UTF_8.name()).getLines() + val lines = Source.fromInputStream(logData)(Codec.UTF8).getLines() replay(lines, sourceName, maybeTruncated, eventsFilter) } From c39b06f4b305a094b4c132b1f6dee5dc4332a1d8 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 21 Sep 2019 12:47:42 +0900 Subject: [PATCH 3/4] Add explanation of change to migration note --- docs/core-migration-guide.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index 1c47dfb27124c..fab0d04adf54e 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -30,3 +30,4 @@ license: | - In Spark 3.0, deprecated method `AccumulableInfo.apply` have been removed because creating `AccumulableInfo` is disallowed. +- In Spark 3.0, event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark writes event log file as default charset of driver JVM process, so SHS of Spark 2.x is needed to read the old event log files in case of incompatible encoding. \ No newline at end of file From 94558a64d0bb0aed23c3c81adc76e89b7f473cd4 Mon Sep 17 00:00:00 2001 From: "Jungtaek Lim (HeartSaVioR)" Date: Sat, 21 Sep 2019 17:14:36 +0900 Subject: [PATCH 4/4] Rephrase SHS to Spark History Server --- docs/core-migration-guide.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/core-migration-guide.md b/docs/core-migration-guide.md index fab0d04adf54e..2d4d91dab075e 100644 --- a/docs/core-migration-guide.md +++ b/docs/core-migration-guide.md @@ -30,4 +30,4 @@ license: | - In Spark 3.0, deprecated method `AccumulableInfo.apply` have been removed because creating `AccumulableInfo` is disallowed. -- In Spark 3.0, event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark writes event log file as default charset of driver JVM process, so SHS of Spark 2.x is needed to read the old event log files in case of incompatible encoding. \ No newline at end of file +- In Spark 3.0, event log file will be written as UTF-8 encoding, and Spark History Server will replay event log files as UTF-8 encoding. Previously Spark writes event log file as default charset of driver JVM process, so Spark History Server of Spark 2.x is needed to read the old event log files in case of incompatible encoding. \ No newline at end of file