Skip to content

Commit 1b2e6f6

Browse files
Wesley Tangsrowen
authored andcommitted
[SPARK-16664][SQL] Fix persist call on Data frames with more than 200…
## What changes were proposed in this pull request? Cherry-pick from d1d5069 and fix the test case ## How was this patch tested? Test updated Author: Wesley Tang <[email protected]> Closes #14404 from breakdawn/branch-1.6.
1 parent 03913af commit 1b2e6f6

File tree

3 files changed

+12
-3
lines changed

3 files changed

+12
-3
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/GenerateColumnAccessor.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
129129
val groupedAccessorsItr = initializeAccessors.grouped(numberOfStatementsThreshold)
130130
val groupedExtractorsItr = extractors.grouped(numberOfStatementsThreshold)
131131
var groupedAccessorsLength = 0
132-
groupedAccessorsItr.zipWithIndex.map { case (body, i) =>
132+
groupedAccessorsItr.zipWithIndex.foreach { case (body, i) =>
133133
groupedAccessorsLength += 1
134134
val funcName = s"accessors$i"
135135
val funcCode = s"""
@@ -139,7 +139,7 @@ object GenerateColumnAccessor extends CodeGenerator[Seq[DataType], ColumnarItera
139139
""".stripMargin
140140
ctx.addNewFunction(funcName, funcCode)
141141
}
142-
groupedExtractorsItr.zipWithIndex.map { case (body, i) =>
142+
groupedExtractorsItr.zipWithIndex.foreach { case (body, i) =>
143143
val funcName = s"extractors$i"
144144
val funcCode = s"""
145145
|private void $funcName() {

sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1186,4 +1186,12 @@ class DataFrameSuite extends QueryTest with SharedSQLContext {
11861186
Seq(1 -> "a").toDF("i", "j").filter($"i".cast(StringType) === "1"),
11871187
Row(1, "a"))
11881188
}
1189+
1190+
test("SPARK-16664: persist with more than 200 columns") {
1191+
val size = 201L
1192+
val rdd = sparkContext.makeRDD(Seq(Row.fromSeq(0L to size)))
1193+
val schema = (0L to size).map(i => StructField("name" + i, LongType, true))
1194+
val df = sqlContext.createDataFrame(rdd, StructType(schema))
1195+
assert(df.persist.take(1).apply(0).toSeq(100).asInstanceOf[Long] == 100)
1196+
}
11891197
}

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,8 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
225225
val columnTypes1 = List.fill(length1)(IntegerType)
226226
val columnarIterator1 = GenerateColumnAccessor.generate(columnTypes1)
227227

228-
val length2 = 10000
228+
// SPARK-16664: the limit of janino is 8117
229+
val length2 = 8117
229230
val columnTypes2 = List.fill(length2)(IntegerType)
230231
val columnarIterator2 = GenerateColumnAccessor.generate(columnTypes2)
231232
}

0 commit comments

Comments
 (0)