Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ trait CatalystConf {

def runSQLonFile: Boolean

def warehousePath: String

/**
* Returns the [[Resolver]] for the current configuration, which can be used to determine if two
* identifiers are equal.
Expand All @@ -52,5 +54,6 @@ case class SimpleCatalystConf(
optimizerMaxIterations: Int = 100,
optimizerInSetConversionThreshold: Int = 10,
maxCaseBranchesForCodegen: Int = 20,
runSQLonFile: Boolean = true)
runSQLonFile: Boolean = true,
warehousePath: String = "/user/hive/warehouse")
extends CatalystConf
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class SessionCatalog(
}

def getDefaultDBPath(db: String): String = {
System.getProperty("java.io.tmpdir") + File.separator + db + ".db"
new Path(new Path(conf.warehousePath), db + ".db").toString
}

// ----------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ object SQLConf {

}

val WAREHOUSE_PATH = SQLConfigBuilder("spark.sql.warehouse.dir")
.doc("The default location for managed databases and tables.")
.stringConf
.createWithDefault("${system:user.dir}/spark-warehouse")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin I think default for this should be something else. This will fail things like https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/58168/


val OPTIMIZER_MAX_ITERATIONS = SQLConfigBuilder("spark.sql.optimizer.maxIterations")
.internal()
.doc("The max number of iterations the optimizer and analyzer runs.")
Expand Down Expand Up @@ -645,6 +650,10 @@ private[sql] class SQLConf extends Serializable with CatalystConf with Logging {

def variableSubstituteDepth: Int = getConf(VARIABLE_SUBSTITUTE_DEPTH)

def warehousePath: String = {
getConf(WAREHOUSE_PATH).replace("${system:user.dir}", System.getProperty("user.dir"))
}

override def orderByOrdinal: Boolean = getConf(ORDER_BY_ORDINAL)

override def groupByOrdinal: Boolean = getConf(GROUP_BY_ORDINAL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.internal

import java.io.File
import java.util.Properties

import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.internal.config.ConfigEntry
import org.apache.spark.sql._
Expand Down Expand Up @@ -65,9 +67,6 @@ private[sql] class SessionState(sparkSession: SparkSession) {
hadoopConf
}

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))

lazy val experimentalMethods = new ExperimentalMethods

/**
Expand Down Expand Up @@ -150,6 +149,12 @@ private[sql] class SessionState(sparkSession: SparkSession) {
new ContinuousQueryManager(sparkSession)
}

private val jarClassLoader: NonClosableMutableURLClassLoader =
sparkSession.sharedState.jarClassLoader

// Automatically extract `spark.sql.*` entries and put it in our SQLConf
// We need to call it after all of vals have been initialized.
setConf(SQLContext.getSQLProperties(sparkSession.sparkContext.getConf))

// ------------------------------------------------------
// Helper methods, partially leftover from pre-2.0 days
Expand Down Expand Up @@ -180,6 +185,17 @@ private[sql] class SessionState(sparkSession: SparkSession) {

def addJar(path: String): Unit = {
sparkSession.sparkContext.addJar(path)

val uri = new Path(path).toUri
val jarURL = if (uri.getScheme == null) {
// `path` is a local file path without a URL scheme
new File(path).toURI.toURL
} else {
// `path` is a URL with a scheme
uri.toURL
}
jarClassLoader.addURL(jarURL)
Thread.currentThread().setContextClassLoader(jarClassLoader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.catalog.{ExternalCatalog, InMemoryCatalog}
import org.apache.spark.sql.execution.CacheManager
import org.apache.spark.sql.execution.ui.SQLListener
import org.apache.spark.util.MutableURLClassLoader


/**
Expand All @@ -44,4 +45,21 @@ private[sql] class SharedState(val sparkContext: SparkContext) {
*/
lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog

/**
* A classloader used to load all user-added jar.
*/
val jarClassLoader = new NonClosableMutableURLClassLoader(
org.apache.spark.util.Utils.getContextOrSparkClassLoader)

}


/**
* URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader.
* This class loader cannot be closed (its `close` method is a no-op).
*/
private[sql] class NonClosableMutableURLClassLoader(parent: ClassLoader)
extends MutableURLClassLoader(Array.empty, parent) {

override def close(): Unit = {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFor
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext

class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
Expand Down Expand Up @@ -83,91 +84,100 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach {
}

test("Create/Drop Database") {
val catalog = sqlContext.sessionState.catalog
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog

val databaseNames = Seq("db1", "`database`")
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)

sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
} finally {
catalog.reset()
databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)

sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))
sql(s"DROP DATABASE $dbName CASCADE")
assert(!catalog.databaseExists(dbNameWithoutBackTicks))
} finally {
catalog.reset()
}
}
}
}

test("Create Database - database already exists") {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))

val message = intercept[AnalysisException] {
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
val db1 = catalog.getDatabaseMetadata(dbNameWithoutBackTicks)
assert(db1 == CatalogDatabase(
dbNameWithoutBackTicks,
"",
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db",
Map.empty))

val message = intercept[AnalysisException] {
sql(s"CREATE DATABASE $dbName")
}.getMessage
assert(message.contains(s"Database '$dbNameWithoutBackTicks' already exists."))
} finally {
catalog.reset()
}
}
}
}

test("Alter/Describe Database") {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
sql(s"CREATE DATABASE $dbName")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")
withSQLConf(
SQLConf.WAREHOUSE_PATH.key -> (System.getProperty("java.io.tmpdir") + File.separator)) {
val catalog = sqlContext.sessionState.catalog
val databaseNames = Seq("db1", "`database`")

databaseNames.foreach { dbName =>
try {
val dbNameWithoutBackTicks = cleanIdentifier(dbName)
val location =
System.getProperty("java.io.tmpdir") + File.separator + s"$dbNameWithoutBackTicks.db"
sql(s"CREATE DATABASE $dbName")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('a'='a', 'b'='b', 'c'='c')")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c))") :: Nil)

sql(s"ALTER DATABASE $dbName SET DBPROPERTIES ('d'='d')")

checkAnswer(
sql(s"DESCRIBE DATABASE EXTENDED $dbName"),
Row("Database Name", dbNameWithoutBackTicks) ::
Row("Description", "") ::
Row("Location", location) ::
Row("Properties", "((a,a), (b,b), (c,c), (d,d))") :: Nil)
} finally {
catalog.reset()
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hive.service.server;

import java.util.Properties;

import org.apache.commons.cli.*;

public class HiveServerServerOptionsProcessor {
private final Options options = new Options();
private org.apache.commons.cli.CommandLine commandLine;
private final String serverName;
private final StringBuilder debugMessage = new StringBuilder();

@SuppressWarnings("static-access")
public HiveServerServerOptionsProcessor(String serverName) {
this.serverName = serverName;
// -hiveconf x=y
options.addOption(OptionBuilder
.withValueSeparator()
.hasArgs(2)
.withArgName("property=value")
.withLongOpt("hiveconf")
.withDescription("Use value for given property")
.create());
// -deregister <versionNumber>
options.addOption(OptionBuilder
.hasArgs(1)
.withArgName("versionNumber")
.withLongOpt("deregister")
.withDescription("Deregister all instances of given version from dynamic service discovery")
.create());
options.addOption(new Option("H", "help", false, "Print help information"));
}

public HiveServer2.ServerOptionsProcessorResponse parse(String[] argv) {
try {
commandLine = new GnuParser().parse(options, argv);
// Process --hiveconf
// Get hiveconf param values and set the System property values
Properties confProps = commandLine.getOptionProperties("hiveconf");
for (String propKey : confProps.stringPropertyNames()) {
// save logging message for log4j output latter after log4j initialize properly
debugMessage.append("Setting " + propKey + "=" + confProps.getProperty(propKey) + ";\n");
// System.setProperty("hivecli." + propKey, confProps.getProperty(propKey));
System.setProperty(propKey, confProps.getProperty(propKey));
}

// Process --help
if (commandLine.hasOption('H')) {
return new HiveServer2.ServerOptionsProcessorResponse(
new HiveServer2.HelpOptionExecutor(serverName, options));
}

// Process --deregister
if (commandLine.hasOption("deregister")) {
return new HiveServer2.ServerOptionsProcessorResponse(
new HiveServer2.DeregisterOptionExecutor(
commandLine.getOptionValue("deregister")));
}
} catch (ParseException e) {
// Error out & exit - we were not able to parse the args successfully
System.err.println("Error starting HiveServer2 with given arguments: ");
System.err.println(e.getMessage());
System.exit(-1);
}
// Default executor, when no option is specified
return new HiveServer2.ServerOptionsProcessorResponse(new HiveServer2.StartOptionExecutor());
}
}
Loading