Skip to content

Commit 25d6de1

Browse files
author
Mukul Murthy
committed
Move truncate method to parent class
1 parent b2ef59c commit 25d6de1

File tree

2 files changed

+17
-19
lines changed

2 files changed

+17
-19
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -222,11 +222,27 @@ class MemoryStreamInputPartition(records: Array[UnsafeRow])
222222
}
223223

224224
/** A common trait for MemorySinks with methods used for testing */
225-
trait MemorySinkBase extends BaseStreamingSink {
225+
trait MemorySinkBase extends BaseStreamingSink with Logging {
226226
def allData: Seq[Row]
227227
def latestBatchData: Seq[Row]
228228
def dataSinceBatch(sinceBatchId: Long): Seq[Row]
229229
def latestBatchId: Option[Long]
230+
231+
/**
232+
* Truncates the given rows to return at most maxRows rows.
233+
* @param rows The data that may need to be truncated.
234+
* @param maxRows Number of rows to truncate to keep.
235+
* @param batchId The ID of the batch that sent these rows, for logging purposes.
236+
* @return Truncated rows.
237+
*/
238+
protected def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
239+
if (rows.length > maxRows && maxRows >= 0) {
240+
logWarning(s"Truncating batch $batchId to $maxRows rows")
241+
rows.take(maxRows)
242+
} else {
243+
rows
244+
}
245+
}
230246
}
231247

232248
/**
@@ -336,15 +352,6 @@ class MemorySink(val schema: StructType, outputMode: OutputMode, options: DataSo
336352
numRows = 0
337353
}
338354

339-
private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
340-
if (rows.length > maxRows && maxRows >= 0) {
341-
logWarning(s"Truncating batch $batchId to $maxRows rows")
342-
rows.take(maxRows)
343-
} else {
344-
rows
345-
}
346-
}
347-
348355
override def toString(): String = "MemorySink"
349356
}
350357

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/memoryV2.scala

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -129,15 +129,6 @@ class MemorySinkV2 extends DataSourceV2 with StreamWriteSupport with MemorySinkB
129129
numRows = 0
130130
}
131131

132-
private def truncateRowsIfNeeded(rows: Array[Row], maxRows: Int, batchId: Long): Array[Row] = {
133-
if (rows.length > maxRows && maxRows >= 0) {
134-
logWarning(s"Truncating batch $batchId to $maxRows rows")
135-
rows.take(maxRows)
136-
} else {
137-
rows
138-
}
139-
}
140-
141132
override def toString(): String = "MemorySinkV2"
142133
}
143134

0 commit comments

Comments
 (0)