From 5f43f573e60fe8d74644a5a5fa554bad8ade3bd2 Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Wed, 28 Nov 2018 01:46:35 -0800 Subject: [PATCH 1/2] Fix the metrics logic in BasicWriteTaskStatsTracker accordingly --- .../apache/spark/sql/internal/SQLConf.scala | 8 + .../datasources/BasicWriteStatsTracker.scala | 15 +- .../datasources/FileFormatDataWriter.scala | 141 ++++++++++++++---- .../datasources/FileFormatWriter.scala | 75 +++++++++- .../datasources/WriteStatsTracker.scala | 5 - .../sql/test/DataFrameReaderWriterSuite.scala | 36 +++++ 6 files changed, 231 insertions(+), 49 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index fe445e001935..e941f8b42d44 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1639,6 +1639,12 @@ object SQLConf { "java.time.* packages are used for the same purpose.") .booleanConf .createWithDefault(false) + + val MAX_HASH_BASED_OUTPUT_WRITERS = buildConf("spark.sql.maxHashBasedOutputWriters") + .doc("Maximum number of output writers when doing hash-based write. " + + "If writers exceeding this limit, executor will fall back to sort-based write.") + .intConf + .createWithDefault(200) } /** @@ -2066,6 +2072,8 @@ class SQLConf extends Serializable with Logging { def legacyTimeParserEnabled: Boolean = getConf(SQLConf.LEGACY_TIME_PARSER_ENABLED) + def maxHashBasedOutputWriters: Int = getConf(SQLConf.MAX_HASH_BASED_OUTPUT_WRITERS) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala index ba7d2b7cbdb1..caeda9c9c428 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/BasicWriteStatsTracker.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources import java.io.FileNotFoundException +import scala.collection.mutable + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path @@ -54,7 +56,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) private[this] var numBytes: Long = 0L private[this] var numRows: Long = 0L - private[this] var curFile: Option[String] = None + private[this] val allFiles = mutable.HashSet[String]() /** * Get the size of the file expected to have been written by a worker. @@ -84,19 +86,18 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def newFile(filePath: String): Unit = { - statCurrentFile() - curFile = Some(filePath) + allFiles.add(filePath) submittedFiles += 1 } - private def statCurrentFile(): Unit = { - curFile.foreach { path => + private def statAllFiles(): Unit = { + allFiles.foreach { path => getFileSize(path).foreach { len => numBytes += len numFiles += 1 } - curFile = None } + allFiles.clear() } override def newRow(row: InternalRow): Unit = { @@ -104,7 +105,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration) } override def getFinalStats(): WriteTaskStats = { - statCurrentFile() + statAllFiles() // Reports bytesWritten and recordsWritten to the Spark output metrics. Option(TaskContext.get()).map(_.taskMetrics().outputMetrics).foreach { outputMetrics => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala index 10733810b641..eb40ef7dc561 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatDataWriter.scala @@ -33,6 +33,11 @@ import org.apache.spark.util.SerializableConfiguration /** * Abstract class for writing out data in a single Spark task. * Exceptions thrown by the implementation of this trait will automatically trigger task aborts. + * + * It first uses hash-based write to process input rows, i.e., keeping mapping between partition/ + * bucket, to the opened output writer. When the number of output writers exceeds threshold, + * it will fallback to sort-based write, i.e., sorting rest of input rows. Then output writers + * can be closed on the fly, after all rows being processed for that partition/bucket. */ abstract class FileFormatDataWriter( description: WriteJobDescription, @@ -51,6 +56,12 @@ abstract class FileFormatDataWriter( protected val statsTrackers: Seq[WriteTaskStatsTracker] = description.statsTrackers.map(_.newTaskInstance()) + /** + * Indicates if we are using sort-based write. + * Because we first try to use hash-based write, its initial value is false. + */ + protected var sortBased: Boolean = false + protected def releaseResources(): Unit = { if (currentWriter != null) { try { @@ -64,6 +75,14 @@ abstract class FileFormatDataWriter( /** Writes a record */ def write(record: InternalRow): Unit + /** Get number of currently opened output writers. */ + def getNumOfOutputWriters: Int + + /** Switch to sort-based write when the hash-based approach is opening too many writers. */ + def switchToSortBasedWrite(): Unit = { + sortBased = true + } + /** * Returns the summary of relative information which * includes the list of partition strings written out. The list of partitions is sent back @@ -94,6 +113,8 @@ class EmptyDirectoryDataWriter( committer: FileCommitProtocol ) extends FileFormatDataWriter(description, taskAttemptContext, committer) { override def write(record: InternalRow): Unit = {} + + override def getNumOfOutputWriters: Int = 0 } /** Writes data to a single directory (used for non-dynamic-partition writes). */ @@ -138,6 +159,8 @@ class SingleDirectoryDataWriter( statsTrackers.foreach(_.newRow(record)) recordsInFile += 1 } + + override def getNumOfOutputWriters: Int = if (currentWriter != null) 1 else 0 } /** @@ -156,16 +179,17 @@ class DynamicPartitionDataWriter( /** Flag saying whether or not the data to be written out is bucketed. */ private val isBucketed = description.bucketIdExpression.isDefined + /** Mapping between partition/bucket and its output writer. */ + private val writerMap = mutable.HashMap[WriterIndex, WriterInfo]() + assert(isPartitioned || isBucketed, s"""DynamicPartitionWriteTask should be used for writing out data that's either |partitioned or bucketed. In this case neither is true. |WriteJobDescription: $description """.stripMargin) - private var fileCounter: Int = _ - private var recordsInFile: Long = _ - private var currentPartionValues: Option[UnsafeRow] = None - private var currentBucketId: Option[Int] = None + private var currentWriterIndex: WriterIndex = WriterIndex(None, None) + private var currentWriterInfo: WriterInfo = WriterInfo(null, 0, 0) /** Extracts the partition values out of an input row. */ private lazy val getPartitionValues: InternalRow => UnsafeRow = { @@ -211,10 +235,10 @@ class DynamicPartitionDataWriter( * belong to * @param bucketId the bucket which all tuples being written by this `OutputWriter` belong to */ - private def newOutputWriter(partitionValues: Option[InternalRow], bucketId: Option[Int]): Unit = { - recordsInFile = 0 - releaseResources() - + private def newOutputWriter( + partitionValues: Option[UnsafeRow], + bucketId: Option[Int], + fileCounter: Int): OutputWriter = { val partDir = partitionValues.map(getPartitionPath(_)) partDir.foreach(updatedPartitions.add) @@ -233,46 +257,98 @@ class DynamicPartitionDataWriter( committer.newTaskTempFile(taskAttemptContext, partDir, ext) } - currentWriter = description.outputWriterFactory.newInstance( + statsTrackers.foreach(_.newFile(currentPath)) + + description.outputWriterFactory.newInstance( path = currentPath, dataSchema = description.dataColumns.toStructType, context = taskAttemptContext) + } - statsTrackers.foreach(_.newFile(currentPath)) + private def releaseCurrentWriterResource(): Unit = { + if (currentWriterInfo.writer != null) { + try { + currentWriterInfo.writer.close() + } finally { + currentWriterInfo.writer = null + } + writerMap -= currentWriterIndex + } + } + + override def releaseResources(): Unit = { + writerMap.values.foreach(writerInfo => { + if (writerInfo.writer != null) { + try { + writerInfo.writer.close() + } finally { + writerInfo.writer = null + } + } + }) } override def write(record: InternalRow): Unit = { val nextPartitionValues = if (isPartitioned) Some(getPartitionValues(record)) else None val nextBucketId = if (isBucketed) Some(getBucketId(record)) else None - if (currentPartionValues != nextPartitionValues || currentBucketId != nextBucketId) { - // See a new partition or bucket - write to a new partition dir (or a new bucket file). - if (isPartitioned && currentPartionValues != nextPartitionValues) { - currentPartionValues = Some(nextPartitionValues.get.copy()) - statsTrackers.foreach(_.newPartition(currentPartionValues.get)) - } - if (isBucketed) { - currentBucketId = nextBucketId - statsTrackers.foreach(_.newBucket(currentBucketId.get)) + if (currentWriterIndex.partitionValues != nextPartitionValues || + currentWriterIndex.bucketId != nextBucketId) { + if (sortBased) { + // The output writer can be closed now in case of sort-based write, + // because no more rows will be written with this writer. + releaseCurrentWriterResource() } - fileCounter = 0 - newOutputWriter(currentPartionValues, currentBucketId) + val nextWriterIndex = WriterIndex(nextPartitionValues.map(_.copy()), nextBucketId) + if (writerMap.contains(nextWriterIndex)) { + // Re-use the existing output writer. + currentWriterInfo = writerMap(nextWriterIndex) + } else { + // See a new partition or bucket - write to a new partition dir (or a new bucket file), + // create a new output writer, and add mapping between partition/bucket to writer. + if (isPartitioned && + currentWriterIndex.partitionValues != nextWriterIndex.partitionValues) { + statsTrackers.foreach(_.newPartition(nextWriterIndex.partitionValues.get)) + } + if (isBucketed && currentWriterIndex.bucketId != nextWriterIndex.bucketId) { + statsTrackers.foreach(_.newBucket(nextWriterIndex.bucketId.get)) + } + val newWriter = newOutputWriter( + nextWriterIndex.partitionValues, + nextWriterIndex.bucketId, + 0) + currentWriterInfo = WriterInfo(newWriter, 0, 0) + writerMap(nextWriterIndex) = currentWriterInfo + } + currentWriterIndex = nextWriterIndex } else if (description.maxRecordsPerFile > 0 && - recordsInFile >= description.maxRecordsPerFile) { + currentWriterInfo.recordsInFile >= description.maxRecordsPerFile) { // Exceeded the threshold in terms of the number of records per file. - // Create a new file by increasing the file counter. - fileCounter += 1 - assert(fileCounter < MAX_FILE_COUNTER, - s"File counter $fileCounter is beyond max value $MAX_FILE_COUNTER") - - newOutputWriter(currentPartionValues, currentBucketId) + // Create a new file and output writer by increasing the file counter. + currentWriterInfo.fileCounter += 1 + assert(currentWriterInfo.fileCounter < MAX_FILE_COUNTER, + s"File counter $currentWriterInfo.fileCounter is beyond max value $MAX_FILE_COUNTER") + + currentWriterInfo.writer = newOutputWriter( + currentWriterIndex.partitionValues, + currentWriterIndex.bucketId, + currentWriterInfo.fileCounter) } val outputRow = getOutputRow(record) - currentWriter.write(outputRow) + currentWriterInfo.writer.write(outputRow) statsTrackers.foreach(_.newRow(outputRow)) - recordsInFile += 1 + currentWriterInfo.recordsInFile += 1 } + + override def getNumOfOutputWriters: Int = writerMap.size + + /** Wrapper class for partition value and bucket id to index output writer. */ + private case class WriterIndex(partitionValues: Option[UnsafeRow], bucketId: Option[Int]) + + /** Wrapper class for output writer bookkeeping information. */ + private case class WriterInfo( + var writer: OutputWriter, var recordsInFile: Long, var fileCounter: Int) } /** A shared job description for all the write tasks. */ @@ -288,7 +364,10 @@ class WriteJobDescription( val customPartitionLocations: Map[TablePartitionSpec, String], val maxRecordsPerFile: Long, val timeZoneId: String, - val statsTrackers: Seq[WriteJobStatsTracker]) + val statsTrackers: Seq[WriteJobStatsTracker], + val maxHashBasedOutputWriters: Int, + val enableRadixSort: Boolean, + val sortOrderWithPartitionsAndBuckets: Seq[SortOrder]) extends Serializable { assert(AttributeSet(allColumns) == AttributeSet(partitionColumns ++ dataColumns), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala index 774fe38f5c2e..d223af642ea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormatWriter.scala @@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} -import org.apache.spark.sql.execution.{SortExec, SparkPlan, SQLExecution} +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.types.StructType import org.apache.spark.util.{SerializableConfiguration, Utils} @@ -102,6 +103,14 @@ object FileFormatWriter extends Logging { val outputWriterFactory = fileFormat.prepareWrite(sparkSession, job, caseInsensitiveOptions, dataSchema) + // Create ordering expressions on partition, bucket id, and sort columns, + // it will be used to sort input rows when falling back from hash-based write + // to sort-based one. + val orderingExprWithPartitionsAndBuckets = + (partitionColumns ++ bucketIdExpression ++ sortColumns) + .map(SortOrder(_, Ascending)) + .map(BindReferences.bindReference(_, outputSpec.outputColumns)) + val description = new WriteJobDescription( uuid = UUID.randomUUID().toString, serializableHadoopConf = new SerializableConfiguration(job.getConfiguration), @@ -116,11 +125,14 @@ object FileFormatWriter extends Logging { .getOrElse(sparkSession.sessionState.conf.maxRecordsPerFile), timeZoneId = caseInsensitiveOptions.get(DateTimeUtils.TIMEZONE_OPTION) .getOrElse(sparkSession.sessionState.conf.sessionLocalTimeZone), - statsTrackers = statsTrackers + statsTrackers = statsTrackers, + maxHashBasedOutputWriters = sparkSession.sessionState.conf.maxHashBasedOutputWriters, + enableRadixSort = sparkSession.sessionState.conf.enableRadixSort, + sortOrderWithPartitionsAndBuckets = orderingExprWithPartitionsAndBuckets ) - // We should first sort by partition columns, then bucket id, and finally sorting columns. - val requiredOrdering = partitionColumns ++ bucketIdExpression ++ sortColumns + // We should sort by sorting columns. + val requiredOrdering = sortColumns // the sort order doesn't matter val actualOrdering = plan.outputOrdering.map(_.child) val orderingMatched = if (requiredOrdering.length > actualOrdering.length) { @@ -238,8 +250,23 @@ object FileFormatWriter extends Logging { try { Utils.tryWithSafeFinallyAndFailureCallbacks(block = { // Execute the task to write rows out and commit the task. - while (iterator.hasNext) { - dataWriter.write(iterator.next()) + var switchToSortBasedWrite = false + var iter = iterator + while (iter.hasNext) { + // In case of too many output writers being opened, falling back from + // hash-based write to sort-based write. + if (!switchToSortBasedWrite && + dataWriter.getNumOfOutputWriters > description.maxHashBasedOutputWriters && + dataWriter.isInstanceOf[DynamicPartitionDataWriter]) { + switchToSortBasedWrite = true + val sorter = createSorter( + description.sortOrderWithPartitionsAndBuckets, + description.allColumns, + description.enableRadixSort) + iter = sorter.sort(iter.asInstanceOf[Iterator[UnsafeRow]]) + dataWriter.switchToSortBasedWrite() + } + dataWriter.write(iter.next()) } dataWriter.commit() })(catchBlock = { @@ -281,4 +308,40 @@ object FileFormatWriter extends Logging { case (statsTracker, stats) => statsTracker.processStats(stats) } } + + /** + * Create a sorter for input rows before doing sort-based write. + */ + private def createSorter( + sortOrder: Seq[SortOrder], + output: Seq[Attribute], + enableRadixSort: Boolean): UnsafeExternalRowSorter = { + val ordering = codegen.GenerateOrdering.generate(sortOrder, output) + + // The comparator for comparing prefix + val boundSortExpression = BindReferences.bindReference(sortOrder.head, output) + val prefixComparator = SortPrefixUtils.getPrefixComparator(boundSortExpression) + + val canUseRadixSort = enableRadixSort && sortOrder.length == 1 && + SortPrefixUtils.canSortFullyWithPrefix(boundSortExpression) + + // The generator for prefix + val prefixExpr = SortPrefix(boundSortExpression) + val prefixProjection = UnsafeProjection.create(Seq(prefixExpr)) + val prefixComputer = new UnsafeExternalRowSorter.PrefixComputer { + private val result = new UnsafeExternalRowSorter.PrefixComputer.Prefix + override def computePrefix(row: InternalRow): + UnsafeExternalRowSorter.PrefixComputer.Prefix = { + val prefix = prefixProjection.apply(row) + result.isNull = prefix.isNullAt(0) + result.value = if (result.isNull) prefixExpr.nullValue else prefix.getLong(0) + result + } + } + + val pageSize = SparkEnv.get.memoryManager.pageSizeBytes + val schema = StructType.fromAttributes(output) + UnsafeExternalRowSorter.create( + schema, ordering, prefixComparator, prefixComputer, pageSize, canUseRadixSort) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala index c39a82ee037b..9f08481205dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriteStatsTracker.scala @@ -32,10 +32,6 @@ trait WriteTaskStats extends Serializable * A trait for classes that are capable of collecting statistics on data that's being processed by * a single write task in [[FileFormatWriter]] - i.e. there should be one instance per executor. * - * This trait is coupled with the way [[FileFormatWriter]] works, in the sense that its methods - * will be called according to how tuples are being written out to disk, namely in sorted order - * according to partitionValue(s), then bucketId. - * * As such, a typical call scenario is: * * newPartition -> newBucket -> newFile -> newRow -. @@ -71,7 +67,6 @@ trait WriteTaskStatsTracker { /** * Process the fact that a new row to update the tracked statistics accordingly. - * The row will be written to the most recently witnessed file (via `newFile`). * @note Keep in mind that any overhead here is per-row, obviously, * so implementations should be as lightweight as possible. * @param row Current data row to be processed. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index e45ab19aadbf..290d3c6c9694 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -881,6 +881,42 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSQLContext with Be } } + test("Insert overwrite table command should have correct output: hash/sort-based write") { + withTable("tbl", "tbl2") { + val df = spark.range(20) + .map(_ => { + val n = scala.util.Random.nextInt + (n, n.toInt, n.toInt) + }) + .toDF("col1", "col2", "col3") + df.write.format("parquet").saveAsTable("tbl") + spark.sql("CREATE TABLE tbl2(COL1 long, COL2 int, COL3 int) USING parquet PARTITIONED " + + "BY (COL2) CLUSTERED BY (COL3) INTO 3 BUCKETS") + val queryToInsertTable = "INSERT OVERWRITE TABLE tbl2 SELECT COL1, COL2, COL3 FROM tbl" + + // Verify output with hash-based write + withSQLConf(SQLConf.MAX_HASH_BASED_OUTPUT_WRITERS.key -> "200") { + spark.sql(queryToInsertTable) + checkAnswer(spark.table("tbl2").orderBy("COL1"), + spark.table("tbl").orderBy("col1")) + } + + // Verify output with sort-based write + withSQLConf(SQLConf.MAX_HASH_BASED_OUTPUT_WRITERS.key -> "-1") { + spark.sql(queryToInsertTable) + checkAnswer(spark.table("tbl2").orderBy("COL1"), + spark.table("tbl").orderBy("col1")) + } + + // Verify output with falling back from hash-based to sort-based write + withSQLConf(SQLConf.MAX_HASH_BASED_OUTPUT_WRITERS.key -> "3") { + spark.sql(queryToInsertTable) + checkAnswer(spark.table("tbl2").orderBy("COL1"), + spark.table("tbl").orderBy("col1")) + } + } + } + test("Create table as select command should output correct schema: basic") { withTable("tbl", "tbl2") { withView("view1") { From 7c544abf4c30474d504b74f2d4c252dc55f1ba4e Mon Sep 17 00:00:00 2001 From: Cheng Su Date: Sun, 2 Dec 2018 18:00:59 -0800 Subject: [PATCH 2/2] Fix the unit test in BasicWriteTaskStatsTrackerSuite --- .../BasicWriteTaskStatsTrackerSuite.scala | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala index 32941d8d2cd1..36a143fad7a7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/BasicWriteTaskStatsTrackerSuite.scala @@ -124,6 +124,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { val tracker = new BasicWriteTaskStatsTracker(conf) tracker.newFile(file.toString) val stream = localfs.create(file, true) + try { assertStats(tracker, 1, 0) stream.write(data1) @@ -148,7 +149,7 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { test("Three files, last one empty") { val file1 = new Path(tempDirPath, "f-3-1") val file2 = new Path(tempDirPath, "f-3-2") - val file3 = new Path(tempDirPath, "f-3-2") + val file3 = new Path(tempDirPath, "f-3-3") val tracker = new BasicWriteTaskStatsTracker(conf) tracker.newFile(file1.toString) write1(file1) @@ -180,6 +181,21 @@ class BasicWriteTaskStatsTrackerSuite extends SparkFunSuite { assertStats(tracker, 2, len1 + len2) } + test("Three files, one duplicated") { + val file1 = new Path(tempDirPath, "f-3-1") + val file2 = new Path(tempDirPath, "f-3-2") + val tracker = new BasicWriteTaskStatsTracker(conf) + tracker.newFile(file1.toString) + write1(file1) + tracker.newFile(file2.toString) + write2(file2) + // file 2 is noted again + tracker.newFile(file2.toString) + // the expected size is file1 + file2; only two files are reported + // as found + assertStats(tracker, 2, len1 + len2) + } + /** * Write a 0-byte file. * @param file file path