Skip to content

Commit 34aa07b

Browse files
committed
revert
1 parent 07d5456 commit 34aa07b

File tree

3 files changed

+17
-18
lines changed

3 files changed

+17
-18
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,7 +169,6 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
169169
log.debug(
170170
s"Creating MutableProj: $expressions, inputSchema: $inputSchema, codegen:$codegenEnabled")
171171
if(codegenEnabled && expressions.forall(_.isThreadSafe)) {
172-
173172
GenerateMutableProjection.generate(expressions, inputSchema)
174173
} else {
175174
() => new InterpretedMutableProjection(expressions, inputSchema)

sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,8 @@ case class TakeOrderedAndProject(
166166

167167
private val ord: RowOrdering = new RowOrdering(sortOrder, child.output)
168168

169-
private val projection = projectList.map(newProjection(_, child.output))
169+
// TODO: remove @transient after figure out how to clean closure at InsertIntoHiveTable.
170+
@transient private val projection = projectList.map(new InterpretedProjection(_, child.output))
170171

171172
private def collectData(): Array[InternalRow] = {
172173
val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -48,16 +48,16 @@ case class InsertIntoHiveTable(
4848
overwrite: Boolean,
4949
ifNotExists: Boolean) extends UnaryNode with HiveInspectors {
5050

51-
val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
52-
lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
53-
private lazy val hiveContext = new Context(sc.hiveconf)
54-
private lazy val catalog = sc.catalog
51+
@transient val sc: HiveContext = sqlContext.asInstanceOf[HiveContext]
52+
@transient lazy val outputClass = newSerializer(table.tableDesc).getSerializedClass
53+
@transient private lazy val hiveContext = new Context(sc.hiveconf)
54+
@transient private lazy val catalog = sc.catalog
5555

56-
private val newSerializer = (tableDesc: TableDesc) => {
56+
private def newSerializer(tableDesc: TableDesc): Serializer = {
5757
val serializer = tableDesc.getDeserializerClass.newInstance().asInstanceOf[Serializer]
5858
serializer.initialize(null, tableDesc.getProperties)
5959
serializer
60-
}: Serializer
60+
}
6161

6262
def output: Seq[Attribute] = child.output
6363

@@ -79,10 +79,13 @@ case class InsertIntoHiveTable(
7979
SparkHiveWriterContainer.createPathFromString(fileSinkConf.getDirName, conf.value))
8080
log.debug("Saving as hadoop file of type " + valueClass.getSimpleName)
8181

82-
val newSer = newSerializer
83-
val schema = table.schema
82+
writerContainer.driverSideSetup()
83+
sc.sparkContext.runJob(rdd, writeToFile _)
84+
writerContainer.commitJob()
85+
8486
// Note that this function is executed on executor side
85-
val writeToFile = (context: TaskContext, iterator: Iterator[InternalRow]) => {
87+
def writeToFile(context: TaskContext, iterator: Iterator[InternalRow]): Unit = {
88+
val serializer = newSerializer(fileSinkConf.getTableInfo)
8689
val standardOI = ObjectInspectorUtils
8790
.getStandardObjectInspector(
8891
fileSinkConf.getTableInfo.getDeserializer.getObjectInspector,
@@ -103,16 +106,12 @@ case class InsertIntoHiveTable(
103106
}
104107

105108
writerContainer
106-
.getLocalFileWriter(row, schema)
107-
.write(newSer(fileSinkConf.getTableInfo).serialize(outputData, standardOI))
109+
.getLocalFileWriter(row, table.schema)
110+
.write(serializer.serialize(outputData, standardOI))
108111
}
109112

110113
writerContainer.close()
111-
}: Unit
112-
113-
writerContainer.driverSideSetup()
114-
sc.sparkContext.runJob(rdd, sc.sparkContext.clean(writeToFile))
115-
writerContainer.commitJob()
114+
}
116115
}
117116

118117
/**

0 commit comments

Comments
 (0)