@@ -216,10 +216,33 @@ class SparkContext(config: SparkConf) extends Logging {
216216 private [spark] val ui = new SparkUI (this )
217217 ui.bind()
218218
219+ /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
220+ val hadoopConfiguration : Configuration = {
221+ val env = SparkEnv .get
222+ val hadoopConf = SparkHadoopUtil .get.newConfiguration()
223+ // Explicitly check for S3 environment variables
224+ if (System .getenv(" AWS_ACCESS_KEY_ID" ) != null &&
225+ System .getenv(" AWS_SECRET_ACCESS_KEY" ) != null ) {
226+ hadoopConf.set(" fs.s3.awsAccessKeyId" , System .getenv(" AWS_ACCESS_KEY_ID" ))
227+ hadoopConf.set(" fs.s3n.awsAccessKeyId" , System .getenv(" AWS_ACCESS_KEY_ID" ))
228+ hadoopConf.set(" fs.s3.awsSecretAccessKey" , System .getenv(" AWS_SECRET_ACCESS_KEY" ))
229+ hadoopConf.set(" fs.s3n.awsSecretAccessKey" , System .getenv(" AWS_SECRET_ACCESS_KEY" ))
230+ }
231+ // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
232+ conf.getAll.foreach { case (key, value) =>
233+ if (key.startsWith(" spark.hadoop." )) {
234+ hadoopConf.set(key.substring(" spark.hadoop." .length), value)
235+ }
236+ }
237+ val bufferSize = conf.get(" spark.buffer.size" , " 65536" )
238+ hadoopConf.set(" io.file.buffer.size" , bufferSize)
239+ hadoopConf
240+ }
241+
219242 // Optionally log Spark events
220243 private [spark] val eventLogger : Option [EventLoggingListener ] = {
221244 if (conf.getBoolean(" spark.eventLog.enabled" , false )) {
222- val logger = new EventLoggingListener (appName, conf)
245+ val logger = new EventLoggingListener (appName, conf, hadoopConfiguration )
223246 logger.start()
224247 listenerBus.addListener(logger)
225248 Some (logger)
@@ -294,29 +317,6 @@ class SparkContext(config: SparkConf) extends Logging {
294317 postEnvironmentUpdate()
295318 postApplicationStart()
296319
297- /** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
298- val hadoopConfiguration : Configuration = {
299- val env = SparkEnv .get
300- val hadoopConf = SparkHadoopUtil .get.newConfiguration()
301- // Explicitly check for S3 environment variables
302- if (System .getenv(" AWS_ACCESS_KEY_ID" ) != null &&
303- System .getenv(" AWS_SECRET_ACCESS_KEY" ) != null ) {
304- hadoopConf.set(" fs.s3.awsAccessKeyId" , System .getenv(" AWS_ACCESS_KEY_ID" ))
305- hadoopConf.set(" fs.s3n.awsAccessKeyId" , System .getenv(" AWS_ACCESS_KEY_ID" ))
306- hadoopConf.set(" fs.s3.awsSecretAccessKey" , System .getenv(" AWS_SECRET_ACCESS_KEY" ))
307- hadoopConf.set(" fs.s3n.awsSecretAccessKey" , System .getenv(" AWS_SECRET_ACCESS_KEY" ))
308- }
309- // Copy any "spark.hadoop.foo=bar" system properties into conf as "foo=bar"
310- conf.getAll.foreach { case (key, value) =>
311- if (key.startsWith(" spark.hadoop." )) {
312- hadoopConf.set(key.substring(" spark.hadoop." .length), value)
313- }
314- }
315- val bufferSize = conf.get(" spark.buffer.size" , " 65536" )
316- hadoopConf.set(" io.file.buffer.size" , bufferSize)
317- hadoopConf
318- }
319-
320320 private [spark] var checkpointDir : Option [String ] = None
321321
322322 // Thread Local variable that can be used by users to pass information down the stack
@@ -381,16 +381,27 @@ class SparkContext(config: SparkConf) extends Logging {
381381 * // In a separate thread:
382382 * sc.cancelJobGroup("some_job_to_cancel")
383383 * }}}
384+ *
385+ * If interruptOnCancel is set to true for the job group, then job cancellation will result
386+ * in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
387+ * that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
388+ * where HDFS may respond to Thread.interrupt() by marking nodes as dead.
384389 */
385- def setJobGroup (groupId : String , description : String ) {
390+ def setJobGroup (groupId : String , description : String , interruptOnCancel : Boolean = false ) {
386391 setLocalProperty(SparkContext .SPARK_JOB_DESCRIPTION , description)
387392 setLocalProperty(SparkContext .SPARK_JOB_GROUP_ID , groupId)
393+ // Note: Specifying interruptOnCancel in setJobGroup (rather than cancelJobGroup) avoids
394+ // changing several public APIs and allows Spark cancellations outside of the cancelJobGroup
395+ // APIs to also take advantage of this property (e.g., internal job failures or canceling from
396+ // JobProgressTab UI) on a per-job basis.
397+ setLocalProperty(SparkContext .SPARK_JOB_INTERRUPT_ON_CANCEL , interruptOnCancel.toString)
388398 }
389399
390400 /** Clear the current thread's job group ID and its description. */
391401 def clearJobGroup () {
392402 setLocalProperty(SparkContext .SPARK_JOB_DESCRIPTION , null )
393403 setLocalProperty(SparkContext .SPARK_JOB_GROUP_ID , null )
404+ setLocalProperty(SparkContext .SPARK_JOB_INTERRUPT_ON_CANCEL , null )
394405 }
395406
396407 // Post init
@@ -1244,6 +1255,8 @@ object SparkContext extends Logging {
12441255
12451256 private [spark] val SPARK_JOB_GROUP_ID = " spark.jobGroup.id"
12461257
1258+ private [spark] val SPARK_JOB_INTERRUPT_ON_CANCEL = " spark.job.interruptOnCancel"
1259+
12471260 private [spark] val SPARK_UNKNOWN_USER = " <unknown>"
12481261
12491262 implicit object DoubleAccumulatorParam extends AccumulatorParam [Double ] {
@@ -1268,8 +1281,10 @@ object SparkContext extends Logging {
12681281
12691282 // TODO: Add AccumulatorParams for other types, e.g. lists and strings
12701283
1271- implicit def rddToPairRDDFunctions [K : ClassTag , V : ClassTag ](rdd : RDD [(K , V )]) =
1284+ implicit def rddToPairRDDFunctions [K , V ](rdd : RDD [(K , V )])
1285+ (implicit kt : ClassTag [K ], vt : ClassTag [V ], ord : Ordering [K ] = null ) = {
12721286 new PairRDDFunctions (rdd)
1287+ }
12731288
12741289 implicit def rddToAsyncRDDActions [T : ClassTag ](rdd : RDD [T ]) = new AsyncRDDActions (rdd)
12751290
0 commit comments