Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class BasicWriteTaskStatsTracker(hadoopConf: Configuration)
}
}

override def newRow(row: InternalRow): Unit = {
override def newRow(filePath: String, row: InternalRow): Unit = {
Copy link
Member

@dongjoon-hyun dongjoon-hyun May 6, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From Apache Side codebase, this is no-op unused parameter and there is no test coverage in this PR. Do you think we can have a sample custom WriteTaskStatsTracker test case to prevent a future regression, @cloud-fan ?

To not break some custom WriteTaskStatsTracker implementations.

numRows += 1
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ class SingleDirectoryDataWriter(
}

currentWriter.write(record)
statsTrackers.foreach(_.newRow(record))
statsTrackers.foreach(_.newRow(currentWriter.path, record))
recordsInFile += 1
}
}
Expand Down Expand Up @@ -301,7 +301,7 @@ abstract class BaseDynamicPartitionDataWriter(
protected def writeRecord(record: InternalRow): Unit = {
val outputRow = getOutputRow(record)
currentWriter.write(outputRow)
statsTrackers.foreach(_.newRow(outputRow))
statsTrackers.foreach(_.newRow(currentWriter.path, outputRow))
recordsInFile += 1
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ trait WriteTaskStatsTracker {
* Process the fact that a new row to update the tracked statistics accordingly.
* @note Keep in mind that any overhead here is per-row, obviously,
* so implementations should be as lightweight as possible.
* @param filePath Path of the file which the row is written to.
* @param row Current data row to be processed.
*/
def newRow(row: InternalRow): Unit
def newRow(filePath: String, row: InternalRow): Unit

/**
* Returns the final statistics computed so far.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources

import scala.collection.mutable

import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.catalyst.InternalRow

class CustomWriteTaskStatsTrackerSuite extends SparkFunSuite {

def checkFinalStats(tracker: CustomWriteTaskStatsTracker, result: Map[String, Int]): Unit = {
assert(tracker.getFinalStats().asInstanceOf[CustomWriteTaskStats].numRowsPerFile == result)
}

test("sequential file writing") {
val tracker = new CustomWriteTaskStatsTracker
tracker.newFile("a")
tracker.newRow("a", null)
tracker.newRow("a", null)
tracker.newFile("b")
checkFinalStats(tracker, Map("a" -> 2, "b" -> 0))
}

test("random file writing") {
val tracker = new CustomWriteTaskStatsTracker
tracker.newFile("a")
tracker.newRow("a", null)
tracker.newFile("b")
tracker.newRow("a", null)
tracker.newRow("b", null)
checkFinalStats(tracker, Map("a" -> 2, "b" -> 1))
}
}

class CustomWriteTaskStatsTracker extends WriteTaskStatsTracker {

val numRowsPerFile = mutable.Map.empty[String, Int]

override def newPartition(partitionValues: InternalRow): Unit = {}

override def newFile(filePath: String): Unit = {
numRowsPerFile.put(filePath, 0)
}

override def closeFile(filePath: String): Unit = {}

override def newRow(filePath: String, row: InternalRow): Unit = {
numRowsPerFile(filePath) += 1
}

override def getFinalStats(): WriteTaskStats = {
CustomWriteTaskStats(numRowsPerFile.toMap)
}
}

case class CustomWriteTaskStats(numRowsPerFile: Map[String, Int]) extends WriteTaskStats