Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

package org.apache.spark.sql.catalyst.analysis

import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConversions._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.catalyst.CatalystConf
import org.apache.spark.sql.catalyst.EmptyConf
Expand Down Expand Up @@ -81,18 +85,18 @@ trait Catalog {
}

class SimpleCatalog(val conf: CatalystConf) extends Catalog {
val tables = new mutable.HashMap[String, LogicalPlan]()
val tables = new ConcurrentHashMap[String, LogicalPlan]

override def registerTable(
tableIdentifier: Seq[String],
plan: LogicalPlan): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables += ((getDbTableName(tableIdent), plan))
tables.put(getDbTableName(tableIdent), plan)
}

override def unregisterTable(tableIdentifier: Seq[String]): Unit = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables -= getDbTableName(tableIdent)
tables.remove(getDbTableName(tableIdent))
}

override def unregisterAllTables(): Unit = {
Expand All @@ -101,18 +105,18 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {

override def tableExists(tableIdentifier: Seq[String]): Boolean = {
val tableIdent = processTableIdentifier(tableIdentifier)
tables.get(getDbTableName(tableIdent)) match {
case Some(_) => true
case None => false
}
tables.containsKey(getDbTableName(tableIdent))
}

override def lookupRelation(
tableIdentifier: Seq[String],
alias: Option[String] = None): LogicalPlan = {
val tableIdent = processTableIdentifier(tableIdentifier)
val tableFullName = getDbTableName(tableIdent)
val table = tables.getOrElse(tableFullName, sys.error(s"Table Not Found: $tableFullName"))
val table = tables.get(tableFullName)
if (table == null) {
sys.error(s"Table Not Found: $tableFullName")
}
val tableWithQualifiers = Subquery(tableIdent.last, table)

// If an alias was specified by the lookup, wrap the plan in a subquery so that attributes are
Expand All @@ -121,9 +125,11 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog {
}

override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = {
tables.map {
case (name, _) => (name, true)
}.toSeq
val result = ArrayBuffer.empty[(String, Boolean)]
for (name <- tables.keySet()) {
result += ((name, true))
}
result
}

override def refreshTable(databaseName: String, tableName: String): Unit = {
Expand Down