Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,13 @@ class JavaSQLContext(val sqlContext: SQLContext) {
/**
* Executes a query expressed in SQL, returning the result as a JavaSchemaRDD
*/
def sql(sqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def sql(sqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, sqlContext.parseSql(sqlQuery))

/**
* :: Experimental ::
* Creates an empty parquet file with the schema of class `beanClass`, which can be registered as
* a table. This registered table can be used as the target of future insertInto` operations.
* a table. This registered table can be used as the target of future `insertInto` operations.
*
* {{{
* JavaSQLContext sqlCtx = new JavaSQLContext(...)
Expand All @@ -62,7 +56,7 @@ class JavaSQLContext(val sqlContext: SQLContext) {
* }}}
*
* @param beanClass A java bean class object that will be used to determine the schema of the
* parquet file. s
* parquet file.
* @param path The path where the directory containing parquet metadata should be created.
* Data inserted into this table will also be stored at this location.
* @param allowExisting When false, an exception will be thrown if this directory already exists.
Expand Down Expand Up @@ -100,14 +94,12 @@ class JavaSQLContext(val sqlContext: SQLContext) {
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
}


/**
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].
*/
def parquetFile(path: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, ParquetRelation(path))


/**
* Registers the given RDD as a temporary table in the catalog. Temporary tables exist only
* during the lifetime of this instance of SQLContext.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,6 @@ class JavaHiveContext(sparkContext: JavaSparkContext) extends JavaSQLContext(spa
/**
* Executes a query expressed in HiveQL, returning the result as a JavaSchemaRDD.
*/
def hql(hqlQuery: String): JavaSchemaRDD = {
val result = new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
// We force query optimization to happen right away instead of letting it happen lazily like
// when using the query DSL. This is so DDL commands behave as expected. This is only
// generates the RDD lineage for DML queries, but do not perform any execution.
result.queryExecution.toRdd
result
}
def hql(hqlQuery: String): JavaSchemaRDD =
new JavaSchemaRDD(sqlContext, HiveQl.parseSql(hqlQuery))
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.hive.api.java

import scala.util.Try

import org.scalatest.FunSuite

import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.sql.api.java.JavaSchemaRDD
import org.apache.spark.sql.execution.ExplainCommand
import org.apache.spark.sql.hive.test.TestHive
import org.apache.spark.sql.test.TestSQLContext

// Implicits
import scala.collection.JavaConversions._

class JavaHiveQLSuite extends FunSuite {
lazy val javaCtx = new JavaSparkContext(TestSQLContext.sparkContext)

// There is a little trickery here to avoid instantiating two HiveContexts in the same JVM
lazy val javaHiveCtx = new JavaHiveContext(javaCtx) {
override val sqlContext = TestHive
}

ignore("SELECT * FROM src") {
assert(
javaHiveCtx.hql("SELECT * FROM src").collect().map(_.getInt(0)) ===
TestHive.sql("SELECT * FROM src").collect().map(_.getInt(0)).toSeq)
}

private val explainCommandClassName =
classOf[ExplainCommand].getSimpleName.stripSuffix("$")

def isExplanation(result: JavaSchemaRDD) = {
val explanation = result.collect().map(_.getString(0))
explanation.size == 1 && explanation.head.startsWith(explainCommandClassName)
}

ignore("Query Hive native command execution result") {
val tableName = "test_native_commands"

assertResult(0) {
javaHiveCtx.hql(s"DROP TABLE IF EXISTS $tableName").count()
}

assertResult(0) {
javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
}

javaHiveCtx.hql("SHOW TABLES").registerAsTable("show_tables")

assert(
javaHiveCtx
.hql("SELECT result FROM show_tables")
.collect()
.map(_.getString(0))
.contains(tableName))

assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
javaHiveCtx.hql(s"DESCRIBE $tableName").registerAsTable("describe_table")

javaHiveCtx
.hql("SELECT result FROM describe_table")
.collect()
.map(_.getString(0).split("\t").map(_.trim))
.toArray
}

assert(isExplanation(javaHiveCtx.hql(
s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))

TestHive.reset()
}

ignore("Exactly once semantics for DDL and command statements") {
val tableName = "test_exactly_once"
val q0 = javaHiveCtx.hql(s"CREATE TABLE $tableName(key INT, value STRING)")

// If the table was not created, the following assertion would fail
assert(Try(TestHive.table(tableName)).isSuccess)

// If the CREATE TABLE command got executed again, the following assertion would fail
assert(Try(q0.count()).isSuccess)
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -184,25 +184,29 @@ class HiveQuerySuite extends HiveComparisonTest {
test("Query Hive native command execution result") {
val tableName = "test_native_commands"

val q0 = hql(s"DROP TABLE IF EXISTS $tableName")
assert(q0.count() == 0)
assertResult(0) {
hql(s"DROP TABLE IF EXISTS $tableName").count()
}

val q1 = hql(s"CREATE TABLE $tableName(key INT, value STRING)")
assert(q1.count() == 0)
assertResult(0) {
hql(s"CREATE TABLE $tableName(key INT, value STRING)").count()
}

val q2 = hql("SHOW TABLES")
val tables = q2.select('result).collect().map { case Row(table: String) => table }
assert(tables.contains(tableName))
assert(
hql("SHOW TABLES")
.select('result)
.collect()
.map(_.getString(0))
.contains(tableName))

val q3 = hql(s"DESCRIBE $tableName")
assertResult(Array(Array("key", "int", "None"), Array("value", "string", "None"))) {
q3.select('result).collect().map { case Row(fieldDesc: String) =>
fieldDesc.split("\t").map(_.trim)
}
hql(s"DESCRIBE $tableName")
.select('result)
.collect()
.map(_.getString(0).split("\t").map(_.trim))
}

val q4 = hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")
assert(isExplanation(q4))
assert(isExplanation(hql(s"EXPLAIN SELECT key, COUNT(*) FROM $tableName GROUP BY key")))

TestHive.reset()
}
Expand Down