Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,17 @@

package org.apache.spark.sql.hive.thriftserver

import java.io._

import scala.collection.mutable.ArrayBuffer
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.concurrent.{Await, Future, Promise}
import scala.concurrent.{Await, Promise}
import scala.sys.process.{Process, ProcessLogger}

import java.io._
import java.util.concurrent.atomic.AtomicInteger

import org.apache.hadoop.hive.conf.HiveConf.ConfVars
import org.scalatest.{BeforeAndAfterAll, FunSuite}

import org.apache.spark.{SparkException, Logging}
import org.apache.spark.Logging
import org.apache.spark.sql.catalyst.util.getTempFilePath

class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
Expand All @@ -53,23 +51,20 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
""".stripMargin.split("\\s+").toSeq ++ extraArgs
}

// AtomicInteger is needed because stderr and stdout of the forked process are handled in
// different threads.
val next = new AtomicInteger(0)
var next = 0
val foundAllExpectedAnswers = Promise.apply[Unit]()
val queryStream = new ByteArrayInputStream(queries.mkString("\n").getBytes)
val buffer = new ArrayBuffer[String]()
val lock = new Object

def captureOutput(source: String)(line: String) {
def captureOutput(source: String)(line: String): Unit = lock.synchronized {
buffer += s"$source> $line"
// If we haven't found all expected answers...
if (next.get() < expectedAnswers.size) {
// If another expected answer is found...
if (line.startsWith(expectedAnswers(next.get()))) {
// If all expected answers have been found...
if (next.incrementAndGet() == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
// If we haven't found all expected answers and another expected answer comes up...
if (next < expectedAnswers.size && line.startsWith(expectedAnswers(next))) {
next += 1
// If all expected answers have been found...
if (next == expectedAnswers.size) {
foundAllExpectedAnswers.trySuccess(())
}
}
}
Expand All @@ -88,8 +83,8 @@ class CliSuite extends FunSuite with BeforeAndAfterAll with Logging {
|=======================
|Spark SQL CLI command line: ${command.mkString(" ")}
|
|Executed query ${next.get()} "${queries(next.get())}",
|But failed to capture expected output "${expectedAnswers(next.get())}" within $timeout.
|Executed query $next "${queries(next)}",
|But failed to capture expected output "${expectedAnswers(next)}" within $timeout.
|
|${buffer.mkString("\n")}
|===========================
Expand Down