Skip to content

Commit 9e8a8f7

Browse files
Anderson de AndradeMarcelo Vanzin
authored andcommitted
[SPARK-12056][CORE] Create a TaskAttemptContext only after calling setConf.
TaskAttemptContext's constructor will clone the configuration instead of referencing it. Calling setConf after creating TaskAttemptContext makes any changes to the configuration made inside setConf unperceived by RecordReader instances. As an example, Titan's InputFormat will change conf when calling setConf. They wrap their InputFormat around Cassandra's ColumnFamilyInputFormat, and append Cassandra's configuration. This change fixes the following error when using Titan's CassandraInputFormat with Spark: *java.lang.RuntimeException: org.apache.thrift.protocol.TProtocolException: Required field 'keyspace' was not present! Struct: set_key space_args(keyspace:null)* There's a discussion of this error here: https://groups.google.com/forum/#!topic/aureliusgraphs/4zpwyrYbGAE Author: Anderson de Andrade <[email protected]> Closes #10046 from adeandrade/newhadooprdd-fix. (cherry picked from commit f434f36) Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 355bd72 commit 9e8a8f7

File tree

1 file changed

+2
-2
lines changed

1 file changed

+2
-2
lines changed

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,14 +138,14 @@ class NewHadoopRDD[K, V](
138138
}
139139
inputMetrics.setBytesReadCallback(bytesReadCallback)
140140

141-
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
142-
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
143141
val format = inputFormatClass.newInstance
144142
format match {
145143
case configurable: Configurable =>
146144
configurable.setConf(conf)
147145
case _ =>
148146
}
147+
val attemptId = newTaskAttemptID(jobTrackerId, id, isMap = true, split.index, 0)
148+
val hadoopAttemptContext = newTaskAttemptContext(conf, attemptId)
149149
private var reader = format.createRecordReader(
150150
split.serializableHadoopSplit.value, hadoopAttemptContext)
151151
reader.initialize(split.serializableHadoopSplit.value, hadoopAttemptContext)

0 commit comments

Comments
 (0)