@@ -114,15 +114,18 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
114114 case ut : UninterruptibleThread =>
115115 // When using a local file system, "writeBatch" must be called on a
116116 // [[org.apache.spark.util.UninterruptibleThread]] so that interrupts can be disabled
117- // while writing the batch file. This is because there is a potential dead-lock in
118- // Hadoop "Shell.runCommand" before 2.5.0 (HADOOP-10622). If the thread running
119- // "Shell.runCommand" is interrupted, then the thread can get deadlocked. In our case,
120- // `writeBatch` creates a file using HDFS API and will call "Shell.runCommand" to set
121- // the file permission if using the local file system, and can get deadlocked if the
122- // stream execution thread is stopped by interrupt. Hence, we make sure that
123- // "writeBatch" is called on [[UninterruptibleThread]] which allows us to disable
124- // interrupts here. Also see SPARK-14131.
125- ut.runUninterruptibly { writeBatch(batchId, metadata, serialize) }
117+ // while writing the batch file.
118+ //
119+ // This is because Hadoop "Shell.runCommand" swallows InterruptException (HADOOP-14084).
120+ // If the user tries to stop a query, and the thread running "Shell.runCommand" is
121+ // interrupted, then InterruptException will be dropped and the query will be still
122+ // running. (Note: `writeBatch` creates a file using HDFS APIs and will call
123+ // "Shell.runCommand" to set the file permission if using the local file system)
124+ //
125+ // Hence, we make sure that "writeBatch" is called on [[UninterruptibleThread]] which
126+ // allows us to disable interrupts here, in order to propagate the interrupt state
127+ // correctly. Also see SPARK-19599.
128+ ut.runUninterruptibly { writeBatch(batchId, metadata) }
126129 case _ =>
127130 throw new IllegalStateException (
128131 " HDFSMetadataLog.add() on a local file system must be executed on " +
@@ -132,20 +135,19 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
132135 // For a distributed file system, such as HDFS or S3, if the network is broken, write
133136 // operations may just hang until timeout. We should enable interrupts to allow stopping
134137 // the query fast.
135- writeBatch(batchId, metadata, serialize )
138+ writeBatch(batchId, metadata)
136139 }
137140 true
138141 }
139142 }
140143
141- def writeTempBatch (metadata : T , writer : (T , OutputStream ) => Unit = serialize): Option [Path ] = {
142- var nextId = 0
144+ def writeTempBatch (metadata : T ): Option [Path ] = {
143145 while (true ) {
144146 val tempPath = new Path (metadataPath, s " . ${UUID .randomUUID.toString}.tmp " )
145147 try {
146148 val output = fileManager.create(tempPath)
147149 try {
148- writer (metadata, output)
150+ serialize (metadata, output)
149151 return Some (tempPath)
150152 } finally {
151153 IOUtils .closeQuietly(output)
@@ -164,7 +166,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
164166 // big problem because it requires the attacker must have the permission to write the
165167 // metadata path. In addition, the old Streaming also have this issue, people can create
166168 // malicious checkpoint files to crash a Streaming application too.
167- nextId += 1
168169 }
169170 }
170171 None
@@ -176,8 +177,8 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
176177 * There may be multiple [[HDFSMetadataLog ]] using the same metadata path. Although it is not a
177178 * valid behavior, we still need to prevent it from destroying the files.
178179 */
179- private def writeBatch (batchId : Long , metadata : T , writer : ( T , OutputStream ) => Unit ): Unit = {
180- val tempPath = writeTempBatch(metadata, writer ).getOrElse(
180+ private def writeBatch (batchId : Long , metadata : T ): Unit = {
181+ val tempPath = writeTempBatch(metadata).getOrElse(
181182 throw new IllegalStateException (s " Unable to create temp batch file $batchId" ))
182183 try {
183184 // Try to commit the batch
@@ -195,12 +196,6 @@ class HDFSMetadataLog[T <: AnyRef : ClassTag](sparkSession: SparkSession, path:
195196 // So throw an exception to tell the user this is not a valid behavior.
196197 throw new ConcurrentModificationException (
197198 s " Multiple HDFSMetadataLog are using $path" , e)
198- case e : FileNotFoundException =>
199- // Sometimes, "create" will succeed when multiple writers are calling it at the same
200- // time. However, only one writer can call "rename" successfully, others will get
201- // FileNotFoundException because the first writer has removed it.
202- throw new ConcurrentModificationException (
203- s " Multiple HDFSMetadataLog are using $path" , e)
204199 } finally {
205200 fileManager.delete(tempPath)
206201 }
0 commit comments