Skip to content

Commit c6ba2ea

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-7862] [SQL] Disable the error message redirect to stderr
This is a follow up of #6404, the ScriptTransformation prints the error msg into stderr directly, probably be a disaster for application log. Author: Cheng Hao <[email protected]> Closes #6882 from chenghao-intel/verbose and squashes the following commits: bfedd77 [Cheng Hao] revert the write 76ff46b [Cheng Hao] update the CircularBuffer 692b19e [Cheng Hao] check the process exitValue for ScriptTransform 47e0970 [Cheng Hao] Use the RedirectThread instead 1de771d [Cheng Hao] naming the threads in ScriptTransformation 8536e81 [Cheng Hao] disable the error message redirection for stderr
1 parent 637b4ee commit c6ba2ea

File tree

5 files changed

+77
-46
lines changed

5 files changed

+77
-46
lines changed

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
23332333
}
23342334
}
23352335
}
2336+
2337+
/**
2338+
* An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
2339+
* in a circular buffer. The current contents of the buffer can be accessed using
2340+
* the toString method.
2341+
*/
2342+
private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
2343+
var pos: Int = 0
2344+
var buffer = new Array[Int](sizeInBytes)
2345+
2346+
def write(i: Int): Unit = {
2347+
buffer(pos) = i
2348+
pos = (pos + 1) % buffer.length
2349+
}
2350+
2351+
override def toString: String = {
2352+
val (end, start) = buffer.splitAt(pos)
2353+
val input = new java.io.InputStream {
2354+
val iterator = (start ++ end).iterator
2355+
2356+
def read(): Int = if (iterator.hasNext) iterator.next() else -1
2357+
}
2358+
val reader = new BufferedReader(new InputStreamReader(input))
2359+
val stringBuilder = new StringBuilder
2360+
var line = reader.readLine()
2361+
while (line != null) {
2362+
stringBuilder.append(line)
2363+
stringBuilder.append("\n")
2364+
line = reader.readLine()
2365+
}
2366+
stringBuilder.toString()
2367+
}
2368+
}

core/src/test/scala/org/apache/spark/util/UtilsSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
673673
assert(!Utils.isInDirectory(nullFile, parentDir))
674674
assert(!Utils.isInDirectory(nullFile, childFile3))
675675
}
676+
677+
test("circular buffer") {
678+
val buffer = new CircularBuffer(25)
679+
val stream = new java.io.PrintStream(buffer, true, "UTF-8")
680+
681+
stream.println("test circular test circular test circular test circular test circular")
682+
assert(buffer.toString === "t circular test circular\n")
683+
}
676684
}

sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.net.URI
2222
import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
2323
import javax.annotation.concurrent.GuardedBy
2424

25+
import org.apache.spark.util.CircularBuffer
26+
2527
import scala.collection.JavaConversions._
2628
import scala.language.reflectiveCalls
2729

@@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
6668
with Logging {
6769

6870
// Circular buffer to hold what hive prints to STDOUT and ERR. Only printed when failures occur.
69-
private val outputBuffer = new java.io.OutputStream {
70-
var pos: Int = 0
71-
var buffer = new Array[Int](10240)
72-
def write(i: Int): Unit = {
73-
buffer(pos) = i
74-
pos = (pos + 1) % buffer.size
75-
}
76-
77-
override def toString: String = {
78-
val (end, start) = buffer.splitAt(pos)
79-
val input = new java.io.InputStream {
80-
val iterator = (start ++ end).iterator
81-
82-
def read(): Int = if (iterator.hasNext) iterator.next() else -1
83-
}
84-
val reader = new BufferedReader(new InputStreamReader(input))
85-
val stringBuilder = new StringBuilder
86-
var line = reader.readLine()
87-
while(line != null) {
88-
stringBuilder.append(line)
89-
stringBuilder.append("\n")
90-
line = reader.readLine()
91-
}
92-
stringBuilder.toString()
93-
}
94-
}
71+
private val outputBuffer = new CircularBuffer()
9572

9673
private val shim = version match {
9774
case hive.v12 => new Shim_v0_12()

sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
3535
import org.apache.spark.sql.hive.HiveShim._
3636
import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
3737
import org.apache.spark.sql.types.DataType
38-
import org.apache.spark.util.Utils
38+
import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
3939

4040
/**
4141
* Transforms the input by forking and running the specified script.
@@ -59,15 +59,13 @@ case class ScriptTransformation(
5959
child.execute().mapPartitions { iter =>
6060
val cmd = List("/bin/bash", "-c", script)
6161
val builder = new ProcessBuilder(cmd)
62-
// redirectError(Redirect.INHERIT) would consume the error output from buffer and
63-
// then print it to stderr (inherit the target from the current Scala process).
64-
// If without this there would be 2 issues:
62+
// We need to start threads connected to the process pipeline:
6563
// 1) The error msg generated by the script process would be hidden.
6664
// 2) If the error msg is too big to chock up the buffer, the input logic would be hung
67-
builder.redirectError(Redirect.INHERIT)
6865
val proc = builder.start()
6966
val inputStream = proc.getInputStream
7067
val outputStream = proc.getOutputStream
68+
val errorStream = proc.getErrorStream
7169
val reader = new BufferedReader(new InputStreamReader(inputStream))
7270

7371
val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
@@ -152,29 +150,43 @@ case class ScriptTransformation(
152150
val dataOutputStream = new DataOutputStream(outputStream)
153151
val outputProjection = new InterpretedProjection(input, child.output)
154152

153+
// TODO make the 2048 configurable?
154+
val stderrBuffer = new CircularBuffer(2048)
155+
// Consume the error stream from the pipeline, otherwise it will be blocked if
156+
// the pipeline is full.
157+
new RedirectThread(errorStream, // input stream from the pipeline
158+
stderrBuffer, // output to a circular buffer
159+
"Thread-ScriptTransformation-STDERR-Consumer").start()
160+
155161
// Put the write(output to the pipeline) into a single thread
156162
// and keep the collector as remain in the main thread.
157163
// otherwise it will causes deadlock if the data size greater than
158164
// the pipeline / buffer capacity.
159165
new Thread(new Runnable() {
160166
override def run(): Unit = {
161-
iter
162-
.map(outputProjection)
163-
.foreach { row =>
164-
if (inputSerde == null) {
165-
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
166-
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
167-
168-
outputStream.write(data)
169-
} else {
170-
val writable = inputSerde.serialize(
171-
row.asInstanceOf[GenericInternalRow].values, inputSoi)
172-
prepareWritable(writable).write(dataOutputStream)
167+
Utils.tryWithSafeFinally {
168+
iter
169+
.map(outputProjection)
170+
.foreach { row =>
171+
if (inputSerde == null) {
172+
val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
173+
ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
174+
175+
outputStream.write(data)
176+
} else {
177+
val writable = inputSerde.serialize(
178+
row.asInstanceOf[GenericInternalRow].values, inputSoi)
179+
prepareWritable(writable).write(dataOutputStream)
180+
}
181+
}
182+
outputStream.close()
183+
} {
184+
if (proc.waitFor() != 0) {
185+
logError(stderrBuffer.toString) // log the stderr circular buffer
173186
}
174187
}
175-
outputStream.close()
176188
}
177-
}).start()
189+
}, "Thread-ScriptTransformation-Feed").start()
178190

179191
iterator
180192
}
@@ -278,3 +290,4 @@ case class HiveScriptIOSchema (
278290
}
279291
}
280292
}
293+

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest {
653653
.queryExecution.toRdd.count())
654654
}
655655

656-
ignore("test script transform for stderr") {
656+
test("test script transform for stderr") {
657657
val data = (1 to 100000).map { i => (i, i, i) }
658658
data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
659659
assert(0 ===

0 commit comments

Comments
 (0)