From 5b48e534928a1dc4d126aa2e46fb8d979a471b98 Mon Sep 17 00:00:00 2001 From: Yun Tang Date: Wed, 28 Jun 2017 17:17:44 +0800 Subject: [PATCH 1/2] Support WAL recover in windows When driver failed over, it will read WAL from HDFS by calling WriteAheadLogBackedBlockRDD.getBlockFromWriteAheadLog(), however, it need a dummy local path to satisfy the method parameter requirements, but the path in windows will contain a colon which is not valid for hadoop. --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 844760ab61d2e..0d584ed972b91 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -135,8 +135,11 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // FileBasedWriteAheadLog will not create any file or directory at that path. Also, // this dummy directory should not already exist otherwise the WAL will try to recover // past events from the directory and throw errors. + // Specifically, the nonExistentDirectory will contain a colon in windows, this is invalid + // for hadoop. Remove the drive letter and colon, e.g. "D:" out of this path by default val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir"), UUID.randomUUID().toString).getAbsolutePath + System.getProperty("java.io.tmpdir").replaceFirst("[a-zA-Z]:", ""), + UUID.randomUUID().toString).getPath writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle) From 895b7669a426064a1116b72d8db0b204fec092a9 Mon Sep 17 00:00:00 2001 From: Yun Tang Date: Thu, 6 Jul 2017 20:08:14 +0800 Subject: [PATCH 2/2] modify code style according to review comment --- .../spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index 0d584ed972b91..2c97c0afa260e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -138,8 +138,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( // Specifically, the nonExistentDirectory will contain a colon in windows, this is invalid // for hadoop. Remove the drive letter and colon, e.g. "D:" out of this path by default val nonExistentDirectory = new File( - System.getProperty("java.io.tmpdir").replaceFirst("[a-zA-Z]:", ""), - UUID.randomUUID().toString).getPath + System.getProperty("java.io.tmpdir"), + UUID.randomUUID().toString).getAbsolutePath.replaceFirst("[a-zA-Z]:", "") writeAheadLog = WriteAheadLogUtils.createLogForReceiver( SparkEnv.get.conf, nonExistentDirectory, hadoopConf) dataRead = writeAheadLog.read(partition.walRecordHandle)