Skip to content

Commit 00804fe

Browse files
committed
use shuffleMemoryManager.pageSizeBytes
1 parent 775cc49 commit 00804fe

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/WriterContainer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -281,14 +281,14 @@ private[sql] class DynamicPartitionWriterContainer(
281281
val getOutputRow = UnsafeProjection.create(dataColumns, inputSchema)
282282

283283
// Expressions that given a partition key build a string like: col1=val/col2=val/...
284-
val partitionStringExpression = partitionColumns.zipWithIndex.map { case (c, i) =>
284+
val partitionStringExpression = partitionColumns.zipWithIndex.flatMap { case (c, i) =>
285285
val escaped =
286286
ScalaUDF(
287287
PartitioningUtils.escapePathName _, StringType, Seq(Cast(c, StringType)), Seq(StringType))
288288
val str = If(IsNull(c), Literal(defaultPartitionName), escaped)
289289
val partitionName = Literal(c.name + "=") :: str :: Nil
290290
if (i == 0) partitionName else Literal(Path.SEPARATOR_CHAR.toString) :: partitionName
291-
}.flatten
291+
}
292292

293293
// Returns the partition path given a partition key.
294294
val getPartitionString =
@@ -315,7 +315,7 @@ private[sql] class DynamicPartitionWriterContainer(
315315
StructType.fromAttributes(dataColumns),
316316
SparkEnv.get.blockManager,
317317
SparkEnv.get.shuffleMemoryManager,
318-
SparkEnv.get.conf.getSizeAsBytes("spark.buffer.pageSize", "64m"))
318+
SparkEnv.get.shuffleMemoryManager.pageSizeBytes)
319319
sorter.insertKV(currentKey, getOutputRow(inputRow))
320320
}
321321
} else {

0 commit comments

Comments
 (0)