Skip to content

Commit 09a0051

Browse files
lianhuiwangrxin
authored andcommitted
[SPARK-15335][SQL] Implement TRUNCATE TABLE Command
## What changes were proposed in this pull request? Like TRUNCATE TABLE Command in Hive, TRUNCATE TABLE is also supported by Hive. See the link: https://cwiki.apache.org/confluence/display/Hive/LanguageManual+DDL Below is the related Hive JIRA: https://issues.apache.org/jira/browse/HIVE-446 This PR is to implement such a command for truncate table excluded column truncation(HIVE-4005). ## How was this patch tested? Added a test case. Author: Lianhui Wang <[email protected]> Closes apache#13170 from lianhuiwang/truncate.
1 parent d5e1c5a commit 09a0051

File tree

3 files changed

+151
-0
lines changed

3 files changed

+151
-0
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,25 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
350350
)
351351
}
352352

353+
/**
354+
* Create a [[TruncateTable]] command.
355+
*
356+
* For example:
357+
* {{{
358+
* TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
359+
* [COLUMNS (col1, col2)]
360+
* }}}
361+
*/
362+
override def visitTruncateTable(ctx: TruncateTableContext): LogicalPlan = withOrigin(ctx) {
363+
if (ctx.identifierList != null) {
364+
throw operationNotAllowed("TRUNCATE TABLE ... COLUMNS", ctx)
365+
}
366+
TruncateTable(
367+
visitTableIdentifier(ctx.tableIdentifier),
368+
Option(ctx.partitionSpec).map(visitNonOptionalPartitionSpec)
369+
)
370+
}
371+
353372
/**
354373
* Convert a table property list into a key-value map.
355374
*/

sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import java.net.URI
2222
import java.util.Date
2323

2424
import scala.collection.mutable.ArrayBuffer
25+
import scala.util.control.NonFatal
26+
27+
import org.apache.hadoop.fs.Path
2528

2629
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
2730
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -270,6 +273,56 @@ case class LoadData(
270273
}
271274
}
272275

276+
/**
277+
* A command to truncate table.
278+
*
279+
* The syntax of this command is:
280+
* {{{
281+
* TRUNCATE TABLE tablename [PARTITION (partcol1=val1, partcol2=val2 ...)]
282+
* }}}
283+
*/
284+
case class TruncateTable(
285+
tableName: TableIdentifier,
286+
partitionSpec: Option[TablePartitionSpec]) extends RunnableCommand {
287+
288+
override def run(sparkSession: SparkSession): Seq[Row] = {
289+
val catalog = sparkSession.sessionState.catalog
290+
if (!catalog.tableExists(tableName)) {
291+
logError(s"table '$tableName' in TRUNCATE TABLE does not exist.")
292+
} else if (catalog.isTemporaryTable(tableName)) {
293+
logError(s"table '$tableName' in TRUNCATE TABLE is a temporary table.")
294+
} else {
295+
val locations = if (partitionSpec.isDefined) {
296+
catalog.listPartitions(tableName, partitionSpec).map(_.storage.locationUri)
297+
} else {
298+
val table = catalog.getTableMetadata(tableName)
299+
if (table.partitionColumnNames.nonEmpty) {
300+
catalog.listPartitions(tableName).map(_.storage.locationUri)
301+
} else {
302+
Seq(table.storage.locationUri)
303+
}
304+
}
305+
val hadoopConf = sparkSession.sessionState.newHadoopConf()
306+
locations.foreach { location =>
307+
if (location.isDefined) {
308+
val path = new Path(location.get)
309+
try {
310+
val fs = path.getFileSystem(hadoopConf)
311+
fs.delete(path, true)
312+
fs.mkdirs(path)
313+
} catch {
314+
case NonFatal(e) =>
315+
throw new AnalysisException(
316+
s"Failed to truncate table '$tableName' when removing data of the path: $path " +
317+
s"because of ${e.toString}")
318+
}
319+
}
320+
}
321+
}
322+
Seq.empty[Row]
323+
}
324+
}
325+
273326
/**
274327
* Command that looks like
275328
* {{{

sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveCommandSuite.scala

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive.execution
1919

2020
import org.apache.spark.sql.{AnalysisException, QueryTest, Row, SaveMode}
2121
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
22+
import org.apache.spark.sql.catalyst.parser.ParseException
2223
import org.apache.spark.sql.hive.test.TestHiveSingleton
2324
import org.apache.spark.sql.test.SQLTestUtils
2425

@@ -269,6 +270,84 @@ class HiveCommandSuite extends QueryTest with SQLTestUtils with TestHiveSingleto
269270
}
270271
}
271272

273+
test("Truncate Table") {
274+
withTable("non_part_table", "part_table") {
275+
sql(
276+
"""
277+
|CREATE TABLE non_part_table (employeeID INT, employeeName STRING)
278+
|ROW FORMAT DELIMITED
279+
|FIELDS TERMINATED BY '|'
280+
|LINES TERMINATED BY '\n'
281+
""".stripMargin)
282+
283+
val testData = hiveContext.getHiveFile("data/files/employee.dat").getCanonicalPath
284+
285+
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE non_part_table""")
286+
checkAnswer(
287+
sql("SELECT * FROM non_part_table WHERE employeeID = 16"),
288+
Row(16, "john") :: Nil)
289+
290+
val testResults = sql("SELECT * FROM non_part_table").collect()
291+
292+
intercept[ParseException] {
293+
sql("TRUNCATE TABLE non_part_table COLUMNS (employeeID)")
294+
}
295+
296+
sql("TRUNCATE TABLE non_part_table")
297+
checkAnswer(sql("SELECT * FROM non_part_table"), Seq.empty[Row])
298+
299+
sql(
300+
"""
301+
|CREATE TABLE part_table (employeeID INT, employeeName STRING)
302+
|PARTITIONED BY (c STRING, d STRING)
303+
|ROW FORMAT DELIMITED
304+
|FIELDS TERMINATED BY '|'
305+
|LINES TERMINATED BY '\n'
306+
""".stripMargin)
307+
308+
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="1")""")
309+
checkAnswer(
310+
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '1'"),
311+
testResults)
312+
313+
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="1", d="2")""")
314+
checkAnswer(
315+
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
316+
testResults)
317+
318+
sql(s"""LOAD DATA LOCAL INPATH "$testData" INTO TABLE part_table PARTITION(c="2", d="2")""")
319+
checkAnswer(
320+
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '2' AND d = '2'"),
321+
testResults)
322+
323+
intercept[ParseException] {
324+
sql("TRUNCATE TABLE part_table PARTITION(c='1', d='1') COLUMNS (employeeID)")
325+
}
326+
327+
sql("TRUNCATE TABLE part_table PARTITION(c='1', d='1')")
328+
checkAnswer(
329+
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '1'"),
330+
Seq.empty[Row])
331+
checkAnswer(
332+
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1' AND d = '2'"),
333+
testResults)
334+
335+
intercept[ParseException] {
336+
sql("TRUNCATE TABLE part_table PARTITION(c='1') COLUMNS (employeeID)")
337+
}
338+
339+
sql("TRUNCATE TABLE part_table PARTITION(c='1')")
340+
checkAnswer(
341+
sql("SELECT employeeID, employeeName FROM part_table WHERE c = '1'"),
342+
Seq.empty[Row])
343+
344+
sql("TRUNCATE TABLE part_table")
345+
checkAnswer(
346+
sql("SELECT employeeID, employeeName FROM part_table"),
347+
Seq.empty[Row])
348+
}
349+
}
350+
272351
test("show columns") {
273352
checkAnswer(
274353
sql("SHOW COLUMNS IN parquet_tab3"),

0 commit comments

Comments
 (0)