Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions R/install-dev.sh
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

set -o pipefail
set -e
set -x

FWDIR="$(cd "`dirname "${BASH_SOURCE[0]}"`"; pwd)"
LIB_DIR="$FWDIR/lib"
Expand Down
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@
</execution>
</executions>
<configuration>
<executable>..${file.separator}R${file.separator}install-dev${script.extension}</executable>
<executable>${project.basedir}${file.separator}..${file.separator}R${file.separator}install-dev${script.extension}</executable>
</configuration>
</plugin>
</plugins>
Expand Down
33 changes: 18 additions & 15 deletions core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -157,20 +157,25 @@ class HadoopRDD[K, V](
if (conf.isInstanceOf[JobConf]) {
logDebug("Re-using user-broadcasted JobConf")
conf.asInstanceOf[JobConf]
} else if (HadoopRDD.containsCachedMetadata(jobConfCacheKey)) {
logDebug("Re-using cached JobConf")
HadoopRDD.getCachedMetadata(jobConfCacheKey).asInstanceOf[JobConf]
} else {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the
// local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects.
// Synchronize to prevent ConcurrentModificationException (SPARK-1097, HADOOP-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
Option(HadoopRDD.getCachedMetadata(jobConfCacheKey))
.map { conf =>
logDebug("Re-using cached JobConf")
conf.asInstanceOf[JobConf]
}
.getOrElse {
// Create a JobConf that will be cached and used across this RDD's getJobConf() calls in
// the local process. The local cache is accessed through HadoopRDD.putCachedMetadata().
// The caching helps minimize GC, since a JobConf can contain ~10KB of temporary
// objects. Synchronize to prevent ConcurrentModificationException (SPARK-1097,
// HADOOP-10456).
HadoopRDD.CONFIGURATION_INSTANTIATION_LOCK.synchronized {
logDebug("Creating new JobConf and caching it for later re-use")
val newJobConf = new JobConf(conf)
initLocalJobConfFuncOpt.foreach(f => f(newJobConf))
HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf)
newJobConf
}
}
}
}
Expand Down Expand Up @@ -360,8 +365,6 @@ private[spark] object HadoopRDD extends Logging {
*/
def getCachedMetadata(key: String): Any = SparkEnv.get.hadoopJobMetadata.get(key)

def containsCachedMetadata(key: String): Boolean = SparkEnv.get.hadoopJobMetadata.containsKey(key)

private def putCachedMetadata(key: String, value: Any): Unit =
SparkEnv.get.hadoopJobMetadata.put(key, value)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -462,13 +462,20 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog {
*/
override def refreshTable(tableName: String): Unit = {
val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName)
// Temp tables: refresh (or invalidate) any metadata/data cached in the plan recursively.
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)
val tableMetadata = sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdent)
val table = sparkSession.table(tableIdent)

if (tableMetadata.tableType == CatalogTableType.VIEW) {
// Temp or persistent views: refresh (or invalidate) any metadata/data cached
// in the plan recursively.
table.queryExecution.analyzed.foreach(_.refresh())
} else {
// Non-temp tables: refresh the metadata cache.
sessionCatalog.refreshTable(tableIdent)
}

// If this table is cached as an InMemoryRelation, drop the original
// cached version and make the new version cached lazily.
val table = sparkSession.table(tableIdent)
if (isCached(table)) {
// Uncache the logicalPlan.
sparkSession.sharedState.cacheManager.uncacheQuery(table, blocking = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,12 +193,12 @@ case class RelationConversions(
private def convert(relation: HiveTableRelation): LogicalRelation = {
val serde = relation.tableMeta.storage.serde.getOrElse("").toLowerCase(Locale.ROOT)
if (serde.contains("parquet")) {
val options = Map(ParquetOptions.MERGE_SCHEMA ->
val options = relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
conf.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet")
} else {
val options = Map[String, String]()
val options = relation.tableMeta.storage.properties
sessionCatalog.metastoreCatalog
.convertToLogicalRelation(relation, options, classOf[OrcFileFormat], "orc")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,22 @@ import org.apache.spark.sql.test.SQLTestUtils
class HiveMetadataCacheSuite extends QueryTest with SQLTestUtils with TestHiveSingleton {

test("SPARK-16337 temporary view refresh") {
withTempView("view_refresh") {
checkRefreshView(isTemp = true)
}

test("view refresh") {
checkRefreshView(isTemp = false)
}

private def checkRefreshView(isTemp: Boolean) {
withView("view_refresh") {
withTable("view_table") {
// Create a Parquet directory
spark.range(start = 0, end = 100, step = 1, numPartitions = 3)
.write.saveAsTable("view_table")

// Read the table in
spark.table("view_table").filter("id > -1").createOrReplaceTempView("view_refresh")
val temp = if (isTemp) "TEMPORARY" else ""
spark.sql(s"CREATE $temp VIEW view_refresh AS SELECT * FROM view_table WHERE id > -1")
assert(sql("select count(*) from view_refresh").first().getLong(0) == 100)

// Delete a file using the Hadoop file system interface since the path returned by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import java.io.File
import java.net.URI

import org.apache.hadoop.fs.Path
import org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER
import org.apache.parquet.hadoop.ParquetFileReader
import org.scalatest.BeforeAndAfterEach

import org.apache.spark.SparkException
Expand All @@ -30,6 +32,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command.{DDLSuite, DDLUtils}
import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveUtils.{CONVERT_METASTORE_ORC, CONVERT_METASTORE_PARQUET}
import org.apache.spark.sql.hive.orc.OrcFileOperator
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
Expand Down Expand Up @@ -1438,12 +1441,8 @@ class HiveDDLSuite
sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
// Check if this is compressed as ZLIB.
val maybeOrcFile = path.listFiles().find(!_.getName.endsWith(".crc"))
assert(maybeOrcFile.isDefined)
val orcFilePath = maybeOrcFile.get.toPath.toString
val expectedCompressionKind =
OrcFileOperator.getFileReader(orcFilePath).get.getCompression
assert("ZLIB" === expectedCompressionKind.name())
val maybeOrcFile = path.listFiles().find(_.getName.startsWith("part"))
assertCompression(maybeOrcFile, "orc", "ZLIB")

sql("CREATE TABLE t2 USING HIVE AS SELECT 1 AS c1, 'a' AS c2")
val table2 = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t2"))
Expand Down Expand Up @@ -1941,4 +1940,47 @@ class HiveDDLSuite
}
}
}

private def assertCompression(maybeFile: Option[File], format: String, compression: String) = {
assert(maybeFile.isDefined)

val actualCompression = format match {
case "orc" =>
OrcFileOperator.getFileReader(maybeFile.get.toPath.toString).get.getCompression.name

case "parquet" =>
val footer = ParquetFileReader.readFooter(
sparkContext.hadoopConfiguration, new Path(maybeFile.get.getPath), NO_FILTER)
footer.getBlocks.get(0).getColumns.get(0).getCodec.toString
}

assert(compression === actualCompression)
}

Seq(("orc", "ZLIB"), ("parquet", "GZIP")).foreach { case (fileFormat, compression) =>
test(s"SPARK-22158 convertMetastore should not ignore table property - $fileFormat") {
withSQLConf(CONVERT_METASTORE_ORC.key -> "true", CONVERT_METASTORE_PARQUET.key -> "true") {
withTable("t") {
withTempPath { path =>
sql(
s"""
|CREATE TABLE t(id int) USING hive
|OPTIONS(fileFormat '$fileFormat', compression '$compression')
|LOCATION '${path.toURI}'
""".stripMargin)
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier("t"))
assert(DDLUtils.isHiveTable(table))
assert(table.storage.serde.get.contains(fileFormat))
assert(table.storage.properties.get("compression") == Some(compression))
assert(spark.table("t").collect().isEmpty)

sql("INSERT INTO t SELECT 1")
checkAnswer(spark.table("t"), Row(1))
val maybeFile = path.listFiles().find(_.getName.startsWith("part"))
assertCompression(maybeFile, fileFormat, compression)
}
}
}
}
}
}