Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package org.apache.spark.sql.hive.thriftserver

import java.io.File
import java.net.ServerSocket
import java.sql.{DriverManager, Statement}
import java.sql.{Date, DriverManager, Statement}
import java.util.concurrent.TimeoutException

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.concurrent.{Await, Promise}
Expand Down Expand Up @@ -51,6 +52,15 @@ import org.apache.spark.sql.hive.HiveShim
class HiveThriftServer2Suite extends FunSuite with Logging {
Class.forName(classOf[HiveDriver].getCanonicalName)

object TestData {
def getTestDataFilePath(name: String) = {
Thread.currentThread().getContextClassLoader.getResource(s"data/files/$name")
}

val smallKv = getTestDataFilePath("small_kv.txt")
val smallKvWithNull = getTestDataFilePath("small_kv_with_null.txt")
}

def randomListeningPort = {
// Let the system to choose a random available port to avoid collision with other parallel
// builds.
Expand Down Expand Up @@ -145,12 +155,8 @@ class HiveThriftServer2Suite extends FunSuite with Logging {
}
}

val env = Seq(
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
"SPARK_TESTING" -> "0",
// Prevents loading classes out of the assembly jar. Otherwise Utils.sparkVersion can't read
// proper version information from the jar manifest.
"SPARK_PREPEND_CLASSES" -> "")
// Resets SPARK_TESTING to avoid loading Log4J configurations in testing class paths
val env = Seq("SPARK_TESTING" -> "0")

Process(command, None, env: _*).run(ProcessLogger(
captureThriftServerOutput("stdout"),
Expand Down Expand Up @@ -194,15 +200,12 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

test("Test JDBC query execution") {
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

val queries =
s"""SET spark.sql.shuffle.partitions=3;
|CREATE TABLE test(key INT, val STRING);
|LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test;
|CACHE TABLE test;
""".stripMargin.split(";").map(_.trim).filter(_.nonEmpty)
val queries = Seq(
"SET spark.sql.shuffle.partitions=3",
"DROP TABLE IF EXISTS test",
"CREATE TABLE test(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test",
"CACHE TABLE test")

queries.foreach(statement.execute)

Expand All @@ -216,14 +219,10 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

test("SPARK-3004 regression: result set containing NULL") {
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource(
"data/files/small_kv_with_null.txt")

val queries = Seq(
"DROP TABLE IF EXISTS test_null",
"CREATE TABLE test_null(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_null")
s"LOAD DATA LOCAL INPATH '${TestData.smallKvWithNull}' OVERWRITE INTO TABLE test_null")

queries.foreach(statement.execute)

Expand Down Expand Up @@ -270,24 +269,63 @@ class HiveThriftServer2Suite extends FunSuite with Logging {

test("SPARK-4292 regression: result set iterator issue") {
withJdbcStatement() { statement =>
val dataFilePath =
Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt")

val queries = Seq(
"DROP TABLE IF EXISTS test_4292",
"CREATE TABLE test_4292(key INT, val STRING)",
s"LOAD DATA LOCAL INPATH '$dataFilePath' OVERWRITE INTO TABLE test_4292")
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_4292")

queries.foreach(statement.execute)

val resultSet = statement.executeQuery("SELECT key FROM test_4292")

Seq(238, 86, 311, 27, 165).foreach { key =>
resultSet.next()
assert(resultSet.getInt(1) == key)
assert(resultSet.getInt(1) === key)
}

statement.executeQuery("DROP TABLE IF EXISTS test_4292")
}
}

test("SPARK-4309 regression: Date type support") {
withJdbcStatement() { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_date",
"CREATE TABLE test_date(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_date")

queries.foreach(statement.execute)

assertResult(Date.valueOf("2011-01-01")) {
val resultSet = statement.executeQuery(
"SELECT CAST('2011-01-01' as date) FROM test_date LIMIT 1")
resultSet.next()
resultSet.getDate(1)
}
}
}

test("SPARK-4407 regression: Complex type support") {
withJdbcStatement() { statement =>
val queries = Seq(
"DROP TABLE IF EXISTS test_map",
"CREATE TABLE test_map(key INT, value STRING)",
s"LOAD DATA LOCAL INPATH '${TestData.smallKv}' OVERWRITE INTO TABLE test_map")

queries.foreach(statement.execute)

assertResult("""{238:"val_238"}""") {
val resultSet = statement.executeQuery("SELECT MAP(key, value) FROM test_map LIMIT 1")
resultSet.next()
resultSet.getString(1)
}

assertResult("""["238","val_238"]""") {
val resultSet = statement.executeQuery(
"SELECT ARRAY(CAST(key AS STRING), value) FROM test_map LIMIT 1")
resultSet.next()
resultSet.getString(1)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.hive.thriftserver

import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.util.{ArrayList => JArrayList, Map => JMap}

import scala.collection.JavaConversions._
Expand Down Expand Up @@ -131,14 +131,13 @@ private[hive] class SparkExecuteStatementOperation(
to.addColumnValue(ColumnValue.byteValue(from.getByte(ordinal)))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(from.getShort(ordinal)))
case DateType =>
to.addColumnValue(ColumnValue.dateValue(from(ordinal).asInstanceOf[Date]))
case TimestampType =>
to.addColumnValue(
ColumnValue.timestampValue(from.get(ordinal).asInstanceOf[Timestamp]))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = result
.queryExecution
.asInstanceOf[HiveContext#QueryExecution]
.toHiveString((from.get(ordinal), dataTypes(ordinal)))
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to.addColumnValue(ColumnValue.stringValue(hiveString))
}
}
Expand All @@ -163,6 +162,8 @@ private[hive] class SparkExecuteStatementOperation(
to.addColumnValue(ColumnValue.byteValue(null))
case ShortType =>
to.addColumnValue(ColumnValue.shortValue(null))
case DateType =>
to.addColumnValue(ColumnValue.dateValue(null))
case TimestampType =>
to.addColumnValue(ColumnValue.timestampValue(null))
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.hive.thriftserver

import java.security.PrivilegedExceptionAction
import java.sql.Timestamp
import java.sql.{Date, Timestamp}
import java.util.concurrent.Future
import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}

Expand Down Expand Up @@ -113,7 +113,7 @@ private[hive] class SparkExecuteStatementOperation(
def addNonNullColumnValue(from: SparkRow, to: ArrayBuffer[Any], ordinal: Int) {
dataTypes(ordinal) match {
case StringType =>
to += from.get(ordinal).asInstanceOf[String]
to += from.getString(ordinal)
case IntegerType =>
to += from.getInt(ordinal)
case BooleanType =>
Expand All @@ -123,33 +123,30 @@ private[hive] class SparkExecuteStatementOperation(
case FloatType =>
to += from.getFloat(ordinal)
case DecimalType() =>
to += from.get(ordinal).asInstanceOf[BigDecimal].bigDecimal
to += from.getAs[BigDecimal](ordinal).bigDecimal
case LongType =>
to += from.getLong(ordinal)
case ByteType =>
to += from.getByte(ordinal)
case ShortType =>
to += from.getShort(ordinal)
case DateType =>
to += from.getAs[Date](ordinal)
case TimestampType =>
to += from.get(ordinal).asInstanceOf[Timestamp]
case BinaryType =>
to += from.get(ordinal).asInstanceOf[String]
case _: ArrayType =>
to += from.get(ordinal).asInstanceOf[String]
case _: StructType =>
to += from.get(ordinal).asInstanceOf[String]
case _: MapType =>
to += from.get(ordinal).asInstanceOf[String]
to += from.getAs[Timestamp](ordinal)
case BinaryType | _: ArrayType | _: StructType | _: MapType =>
val hiveString = HiveContext.toHiveString((from.get(ordinal), dataTypes(ordinal)))
to += hiveString
}
}

def getNextRowSet(order: FetchOrientation, maxRowsL: Long): RowSet = {
validateDefaultFetchOrientation(order)
assertState(OperationState.FINISHED)
setHasResultSet(true)
val reultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
val resultRowSet: RowSet = RowSetFactory.create(getResultSetSchema, getProtocolVersion)
if (!iter.hasNext) {
reultRowSet
resultRowSet
} else {
// maxRowsL here typically maps to java.sql.Statement.getFetchSize, which is an int
val maxRows = maxRowsL.toInt
Expand All @@ -166,10 +163,10 @@ private[hive] class SparkExecuteStatementOperation(
}
curCol += 1
}
reultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
resultRowSet.addRow(row.toArray.asInstanceOf[Array[Object]])
curRow += 1
}
reultRowSet
resultRowSet
}
}

Expand Down
Loading