Skip to content

Commit 778f3ca

Browse files
navisrxin
authored andcommitted
[SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
Just replaced mutable.HashMap to ConcurrentHashMap Author: navis.ryu <[email protected]> Closes apache#6699 from navis/SPARK-7792 and squashes the following commits: f03654a [navis.ryu] [SPARK-7792] [SQL] HiveContext registerTempTable not thread safe
1 parent 6e4fb0c commit 778f3ca

File tree

1 file changed

+17
-11
lines changed
  • sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis

1 file changed

+17
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@
1717

1818
package org.apache.spark.sql.catalyst.analysis
1919

20+
import java.util.concurrent.ConcurrentHashMap
21+
22+
import scala.collection.JavaConversions._
2023
import scala.collection.mutable
24+
import scala.collection.mutable.ArrayBuffer
2125

2226
import org.apache.spark.sql.catalyst.CatalystConf
2327
import org.apache.spark.sql.catalyst.EmptyConf
@@ -81,18 +85,18 @@ trait Catalog {
8185
}
8286

8387
class SimpleCatalog(val conf: CatalystConf) extends Catalog {
84-
val tables = new mutable.HashMap[String, LogicalPlan]()
88+
val tables = new ConcurrentHashMap[String, LogicalPlan]
8589

8690
override def registerTable(
8791
tableIdentifier: Seq[String],
8892
plan: LogicalPlan): Unit = {
8993
val tableIdent = processTableIdentifier(tableIdentifier)
90-
tables += ((getDbTableName(tableIdent), plan))
94+
tables.put(getDbTableName(tableIdent), plan)
9195
}
9296

9397
override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
9498
val tableIdent = processTableIdentifier(tableIdentifier)
95-
tables -= getDbTableName(tableIdent)
99+
tables.remove(getDbTableName(tableIdent))
96100
}
97101

98102
override def unregisterAllTables(): Unit = {
@@ -101,18 +105,18 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
101105

102106
override def tableExists(tableIdentifier: Seq[String]): Boolean = {
103107
val tableIdent = processTableIdentifier(tableIdentifier)
104-
tables.get(getDbTableName(tableIdent)) match {
105-
case Some(_) => true
106-
case None => false
107-
}
108+
tables.containsKey(getDbTableName(tableIdent))
108109
}
109110

110111
override def lookupRelation(
111112
tableIdentifier: Seq[String],
112113
alias: Option[String] = None): LogicalPlan = {
113114
val tableIdent = processTableIdentifier(tableIdentifier)
114115
val tableFullName = getDbTableName(tableIdent)
115-
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
116+
val table = tables.get(tableFullName)
117+
if (table == null) {
118+
sys.error(s"Table Not Found: $tableFullName")
119+
}
116120
val tableWithQualifiers = Subquery(tableIdent.last, table)
117121

118122
// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
@@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
121125
}
122126

123127
override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
124-
tables.map {
125-
case (name, _) => (name, true)
126-
}.toSeq
128+
val result = ArrayBuffer.empty[(String, Boolean)]
129+
for (name <- tables.keySet()) {
130+
result += ((name, true))
131+
}
132+
result
127133
}
128134

129135
override def refreshTable(databaseName: String, tableName: String): Unit = {

0 commit comments

Comments
 (0)