Skip to content

Commit de5e531

Browse files
brkyvztdas
authored andcommitted
[SPARK-11731][STREAMING] Enable batching on Driver WriteAheadLog by default
Using batching on the driver for the WriteAheadLog should be an improvement for all environments and use cases. Users will be able to scale to much higher number of receivers with the BatchedWriteAheadLog. Therefore we should turn it on by default, and QA it in the QA period. I've also added some tests to make sure the default configurations are correct regarding recent additions: - batching on by default - closeFileAfterWrite off by default - parallelRecovery off by default Author: Burak Yavuz <[email protected]> Closes #9695 from brkyvz/enable-batch-wal.
1 parent b0c3fd3 commit de5e531

File tree

5 files changed

+48
-7
lines changed

5 files changed

+48
-7
lines changed

streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ private[streaming] object WriteAheadLogUtils extends Logging {
6767
}
6868

6969
def isBatchingEnabled(conf: SparkConf, isDriver: Boolean): Boolean = {
70-
isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = false)
70+
isDriver && conf.getBoolean(DRIVER_WAL_BATCHING_CONF_KEY, defaultValue = true)
7171
}
7272

7373
/**

streaming/src/test/java/org/apache/spark/streaming/JavaWriteAheadLogSuite.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ public void close() {
108108
public void testCustomWAL() {
109109
SparkConf conf = new SparkConf();
110110
conf.set("spark.streaming.driver.writeAheadLog.class", JavaWriteAheadLogSuite.class.getName());
111+
conf.set("spark.streaming.driver.writeAheadLog.allowBatching", "false");
111112
WriteAheadLog wal = WriteAheadLogUtils.createLogForDriver(conf, null, null);
112113

113114
String data1 = "data1";

streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockTrackerSuite.scala

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -330,8 +330,13 @@ class ReceivedBlockTrackerSuite
330330
: Seq[ReceivedBlockTrackerLogEvent] = {
331331
logFiles.flatMap {
332332
file => new FileBasedWriteAheadLogReader(file, hadoopConf).toSeq
333-
}.map { byteBuffer =>
334-
Utils.deserialize[ReceivedBlockTrackerLogEvent](byteBuffer.array)
333+
}.flatMap { byteBuffer =>
334+
val validBuffer = if (WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true)) {
335+
Utils.deserialize[Array[Array[Byte]]](byteBuffer.array()).map(ByteBuffer.wrap)
336+
} else {
337+
Array(byteBuffer)
338+
}
339+
validBuffer.map(b => Utils.deserialize[ReceivedBlockTrackerLogEvent](b.array()))
335340
}.toList
336341
}
337342

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ import java.io._
2020
import java.nio.ByteBuffer
2121
import java.util.{Iterator => JIterator}
2222
import java.util.concurrent.atomic.AtomicInteger
23-
import java.util.concurrent.{TimeUnit, CountDownLatch, ThreadPoolExecutor}
23+
import java.util.concurrent.{RejectedExecutionException, TimeUnit, CountDownLatch, ThreadPoolExecutor}
2424

2525
import scala.collection.JavaConverters._
2626
import scala.collection.mutable.ArrayBuffer
@@ -190,6 +190,28 @@ abstract class CommonWriteAheadLogTests(
190190
}
191191
assert(!nonexistentTempPath.exists(), "Directory created just by attempting to read segment")
192192
}
193+
194+
test(testPrefix + "parallel recovery not enabled if closeFileAfterWrite = false") {
195+
// write some data
196+
val writtenData = (1 to 10).map { i =>
197+
val data = generateRandomData()
198+
val file = testDir + s"/log-$i-$i"
199+
writeDataManually(data, file, allowBatching)
200+
data
201+
}.flatten
202+
203+
val wal = createWriteAheadLog(testDir, closeFileAfterWrite, allowBatching)
204+
// create iterator but don't materialize it
205+
val readData = wal.readAll().asScala.map(byteBufferToString)
206+
wal.close()
207+
if (closeFileAfterWrite) {
208+
// the threadpool is shutdown by the wal.close call above, therefore we shouldn't be able
209+
// to materialize the iterator with parallel recovery
210+
intercept[RejectedExecutionException](readData.toArray)
211+
} else {
212+
assert(readData.toSeq === writtenData)
213+
}
214+
}
193215
}
194216

195217
class FileBasedWriteAheadLogSuite

streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogUtilsSuite.scala

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -56,19 +56,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
5656
test("log selection and creation") {
5757

5858
val emptyConf = new SparkConf() // no log configuration
59-
assertDriverLogClass[FileBasedWriteAheadLog](emptyConf)
59+
assertDriverLogClass[FileBasedWriteAheadLog](emptyConf, isBatched = true)
6060
assertReceiverLogClass[FileBasedWriteAheadLog](emptyConf)
6161

6262
// Verify setting driver WAL class
6363
val driverWALConf = new SparkConf().set("spark.streaming.driver.writeAheadLog.class",
6464
classOf[MockWriteAheadLog0].getName())
65-
assertDriverLogClass[MockWriteAheadLog0](driverWALConf)
65+
assertDriverLogClass[MockWriteAheadLog0](driverWALConf, isBatched = true)
6666
assertReceiverLogClass[FileBasedWriteAheadLog](driverWALConf)
6767

6868
// Verify setting receiver WAL class
6969
val receiverWALConf = new SparkConf().set("spark.streaming.receiver.writeAheadLog.class",
7070
classOf[MockWriteAheadLog0].getName())
71-
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf)
71+
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
7272
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
7373

7474
// Verify setting receiver WAL class with 1-arg constructor
@@ -104,6 +104,19 @@ class WriteAheadLogUtilsSuite extends SparkFunSuite {
104104
assertDriverLogClass[FileBasedWriteAheadLog](receiverWALConf, isBatched = true)
105105
assertReceiverLogClass[MockWriteAheadLog0](receiverWALConf)
106106
}
107+
108+
test("batching is enabled by default in WriteAheadLog") {
109+
val conf = new SparkConf()
110+
assert(WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = true))
111+
// batching is not valid for receiver WALs
112+
assert(!WriteAheadLogUtils.isBatchingEnabled(conf, isDriver = false))
113+
}
114+
115+
test("closeFileAfterWrite is disabled by default in WriteAheadLog") {
116+
val conf = new SparkConf()
117+
assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = true))
118+
assert(!WriteAheadLogUtils.shouldCloseFileAfterWrite(conf, isDriver = false))
119+
}
107120
}
108121

109122
object WriteAheadLogUtilsSuite {

0 commit comments

Comments
 (0)