Skip to content

Commit 983d4fc

Browse files
committed
Merge remote-tracking branch 'origin/master' into nan
2 parents 88bd73c + 6cb6096 commit 983d4fc

File tree

8 files changed

+81
-523
lines changed

8 files changed

+81
-523
lines changed

project/MimaExcludes.scala

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,9 @@ object MimaExcludes {
6464
excludePackage("org.apache.spark.sql.execution"),
6565
// Parquet support is considered private.
6666
excludePackage("org.apache.spark.sql.parquet"),
67+
// The old JSON RDD is removed in favor of streaming Jackson
68+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD$"),
69+
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.json.JsonRDD"),
6770
// local function inside a method
6871
ProblemFilters.exclude[MissingMethodProblem](
6972
"org.apache.spark.sql.SQLContext.org$apache$spark$sql$SQLContext$$needsConversion$1")

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateMutableProjection.scala

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.expressions.codegen
1919

2020
import org.apache.spark.sql.catalyst.expressions._
2121

22+
import scala.collection.mutable.ArrayBuffer
23+
2224
// MutableProjection is not accessible in Java
2325
abstract class BaseMutableProjection extends MutableProjection
2426

@@ -45,10 +47,41 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
4547
else
4648
${ctx.setColumn("mutableRow", e.dataType, i, evaluationCode.primitive)};
4749
"""
48-
}.mkString("\n")
50+
}
51+
// collect projections into blocks as function has 64kb codesize limit in JVM
52+
val projectionBlocks = new ArrayBuffer[String]()
53+
val blockBuilder = new StringBuilder()
54+
for (projection <- projectionCode) {
55+
if (blockBuilder.length > 16 * 1000) {
56+
projectionBlocks.append(blockBuilder.toString())
57+
blockBuilder.clear()
58+
}
59+
blockBuilder.append(projection)
60+
}
61+
projectionBlocks.append(blockBuilder.toString())
62+
63+
val (projectionFuns, projectionCalls) = {
64+
// inline execution if codesize limit was not broken
65+
if (projectionBlocks.length == 1) {
66+
("", projectionBlocks.head)
67+
} else {
68+
(
69+
projectionBlocks.zipWithIndex.map { case (body, i) =>
70+
s"""
71+
|private void apply$i(InternalRow i) {
72+
| $body
73+
|}
74+
""".stripMargin
75+
}.mkString,
76+
projectionBlocks.indices.map(i => s"apply$i(i);").mkString("\n")
77+
)
78+
}
79+
}
80+
4981
val mutableStates = ctx.mutableStates.map { case (javaType, variableName, initialValue) =>
5082
s"private $javaType $variableName = $initialValue;"
5183
}.mkString("\n ")
84+
5285
val code = s"""
5386
public Object generate($exprType[] expr) {
5487
return new SpecificProjection(expr);
@@ -75,9 +108,11 @@ object GenerateMutableProjection extends CodeGenerator[Seq[Expression], () => Mu
75108
return (InternalRow) mutableRow;
76109
}
77110

111+
$projectionFuns
112+
78113
public Object apply(Object _i) {
79114
InternalRow i = (InternalRow) _i;
80-
$projectionCode
115+
$projectionCalls
81116

82117
return mutableRow;
83118
}

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CodeGenerationSuite.scala

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import org.apache.spark.sql.types.{DataTypeTestUtils, NullType, StructField, Str
2929
/**
3030
* Additional tests for code generation.
3131
*/
32-
class CodeGenerationSuite extends SparkFunSuite {
32+
class CodeGenerationSuite extends SparkFunSuite with ExpressionEvalHelper {
3333

3434
test("multithreaded eval") {
3535
import scala.concurrent._
@@ -56,10 +56,10 @@ class CodeGenerationSuite extends SparkFunSuite {
5656
val rowOrdering = RowOrdering.forSchema(Seq(dataType, dataType))
5757
val genOrdering = GenerateOrdering.generate(
5858
BoundReference(0, dataType, nullable = true).asc ::
59-
BoundReference(1, dataType, nullable = true).asc :: Nil)
59+
BoundReference(1, dataType, nullable = true).asc :: Nil)
6060
val rowType = StructType(
6161
StructField("a", dataType, nullable = true) ::
62-
StructField("b", dataType, nullable = true) :: Nil)
62+
StructField("b", dataType, nullable = true) :: Nil)
6363
val maybeDataGenerator = RandomDataGenerator.forType(rowType, nullable = false)
6464
assume(maybeDataGenerator.isDefined)
6565
val randGenerator = maybeDataGenerator.get
@@ -81,4 +81,16 @@ class CodeGenerationSuite extends SparkFunSuite {
8181
}
8282
}
8383
}
84+
85+
test("SPARK-8443: split wide projections into blocks due to JVM code size limit") {
86+
val length = 5000
87+
val expressions = List.fill(length)(EqualTo(Literal(1), Literal(1)))
88+
val plan = GenerateMutableProjection.generate(expressions)()
89+
val actual = plan(new GenericMutableRow(length)).toSeq
90+
val expected = Seq.fill(length)(true)
91+
92+
if (!checkResult(actual, expected)) {
93+
fail(s"Incorrect Evaluation: expressions: $expressions, actual: $actual, expected: $expected")
94+
}
95+
}
8496
}

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.api.java.JavaRDD
2727
import org.apache.spark.deploy.SparkHadoopUtil
2828
import org.apache.spark.rdd.RDD
2929
import org.apache.spark.sql.jdbc.{JDBCPartition, JDBCPartitioningInfo, JDBCRelation}
30-
import org.apache.spark.sql.json.{JsonRDD, JSONRelation}
30+
import org.apache.spark.sql.json.JSONRelation
3131
import org.apache.spark.sql.parquet.ParquetRelation2
3232
import org.apache.spark.sql.sources.{LogicalRelation, ResolvedDataSource}
3333
import org.apache.spark.sql.types.StructType
@@ -236,17 +236,8 @@ class DataFrameReader private[sql](sqlContext: SQLContext) {
236236
*/
237237
def json(jsonRDD: RDD[String]): DataFrame = {
238238
val samplingRatio = extraOptions.getOrElse("samplingRatio", "1.0").toDouble
239-
if (sqlContext.conf.useJacksonStreamingAPI) {
240-
sqlContext.baseRelationToDataFrame(
241-
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
242-
} else {
243-
val columnNameOfCorruptJsonRecord = sqlContext.conf.columnNameOfCorruptRecord
244-
val appliedSchema = userSpecifiedSchema.getOrElse(
245-
JsonRDD.nullTypeToStringType(
246-
JsonRDD.inferSchema(jsonRDD, 1.0, columnNameOfCorruptJsonRecord)))
247-
val rowRDD = JsonRDD.jsonStringToRow(jsonRDD, appliedSchema, columnNameOfCorruptJsonRecord)
248-
sqlContext.internalCreateDataFrame(rowRDD, appliedSchema)
249-
}
239+
sqlContext.baseRelationToDataFrame(
240+
new JSONRelation(() => jsonRDD, None, samplingRatio, userSpecifiedSchema)(sqlContext))
250241
}
251242

252243
/**

sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -401,9 +401,6 @@ private[spark] object SQLConf {
401401
"spark.sql.useSerializer2",
402402
defaultValue = Some(true), isPublic = false)
403403

404-
val USE_JACKSON_STREAMING_API = booleanConf("spark.sql.json.useJacksonStreamingAPI",
405-
defaultValue = Some(true), doc = "<TODO>")
406-
407404
object Deprecated {
408405
val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
409406
}
@@ -473,8 +470,6 @@ private[sql] class SQLConf extends Serializable with CatalystConf {
473470

474471
private[spark] def useSqlSerializer2: Boolean = getConf(USE_SQL_SERIALIZER2)
475472

476-
private[spark] def useJacksonStreamingAPI: Boolean = getConf(USE_JACKSON_STREAMING_API)
477-
478473
private[spark] def autoBroadcastJoinThreshold: Int = getConf(AUTO_BROADCASTJOIN_THRESHOLD)
479474

480475
private[spark] def defaultSizeInBytes: Long =

sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala

Lines changed: 12 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -157,51 +157,27 @@ private[sql] class JSONRelation(
157157
}
158158
}
159159

160-
private val useJacksonStreamingAPI: Boolean = sqlContext.conf.useJacksonStreamingAPI
161-
162160
override val needConversion: Boolean = false
163161

164162
override lazy val schema = userSpecifiedSchema.getOrElse {
165-
if (useJacksonStreamingAPI) {
166-
InferSchema(
167-
baseRDD(),
168-
samplingRatio,
169-
sqlContext.conf.columnNameOfCorruptRecord)
170-
} else {
171-
JsonRDD.nullTypeToStringType(
172-
JsonRDD.inferSchema(
173-
baseRDD(),
174-
samplingRatio,
175-
sqlContext.conf.columnNameOfCorruptRecord))
176-
}
163+
InferSchema(
164+
baseRDD(),
165+
samplingRatio,
166+
sqlContext.conf.columnNameOfCorruptRecord)
177167
}
178168

179169
override def buildScan(): RDD[Row] = {
180-
if (useJacksonStreamingAPI) {
181-
JacksonParser(
182-
baseRDD(),
183-
schema,
184-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
185-
} else {
186-
JsonRDD.jsonStringToRow(
187-
baseRDD(),
188-
schema,
189-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
190-
}
170+
JacksonParser(
171+
baseRDD(),
172+
schema,
173+
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
191174
}
192175

193176
override def buildScan(requiredColumns: Seq[Attribute], filters: Seq[Expression]): RDD[Row] = {
194-
if (useJacksonStreamingAPI) {
195-
JacksonParser(
196-
baseRDD(),
197-
StructType.fromAttributes(requiredColumns),
198-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
199-
} else {
200-
JsonRDD.jsonStringToRow(
201-
baseRDD(),
202-
StructType.fromAttributes(requiredColumns),
203-
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
204-
}
177+
JacksonParser(
178+
baseRDD(),
179+
StructType.fromAttributes(requiredColumns),
180+
sqlContext.conf.columnNameOfCorruptRecord).map(_.asInstanceOf[Row])
205181
}
206182

207183
override def insert(data: DataFrame, overwrite: Boolean): Unit = {

0 commit comments

Comments
 (0)