From 3b44c5978bd44db986621d3e8511e9165b66926b Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Wed, 20 Apr 2016 11:06:30 -0700 Subject: [PATCH 1/3] adding testcase --- .../org/apache/spark/sql/DataFrameSuite.scala | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index e953a6e8ef0c2..009c101e746d5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1429,4 +1429,23 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { getMessage() assert(e1.startsWith("Path does not exist")) } + + test("SPARK-12987: drop column ") { + val df = Seq((1, 2)).toDF("a_b", "a.c") + val df1 = df.drop("a_b") + checkAnswer(df1, Row(2)) + assert(df1.schema.map(_.name) === Seq("a.c")) + } + + test("SPARK-14759: drop column ") { + val df1 = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("any", "hour") + val df2 = sqlContext.createDataFrame(Seq((1, 3))).toDF("any").withColumn("hour", lit(10)) + val j = df1.join(df2, $"df1.hour" === $"df2.hour", "left") + assert(j.schema.map(_.name) === Seq("any","hour","any","hour")) + print("Columns after join:{0}".format(j.columns)) + val jj = j.drop($"df2.hour") + assert(jj.schema.map(_.name) === Seq("any")) + print("Columns after drop 'hour':{0}".format(jj.columns)) + } + } From 6dd6ca9aedcad9b024cbe092b2ee7540c90c0136 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Fri, 3 Jun 2016 16:33:12 -0700 Subject: [PATCH 2/3] fix7 --- .../scala/org/apache/spark/SparkContext.scala | 26 +++++++++++++++++++ .../scala/org/apache/spark/rpc/RpcEnv.scala | 8 ++++++ .../spark/rpc/netty/NettyStreamManager.scala | 5 ++++ .../org/apache/spark/SparkContextSuite.scala | 6 ++++- .../spark/sql/catalyst/parser/SqlBase.g4 | 2 +- .../spark/sql/execution/SparkSqlParser.scala | 5 ++++ .../sql/execution/command/resources.scala | 10 +++++++ .../org/apache/spark/sql/DataFrameSuite.scala | 19 -------------- .../sql/hive/thriftserver/CliSuite.scala | 11 ++++++++ .../sql/hive/execution/HiveQuerySuite.scala | 5 ++++ 10 files changed, 76 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5aba2a8c94691..e254b2f42016c 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1440,6 +1440,32 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli postEnvironmentUpdate() } + /** + * Delete a file to be downloaded with this Spark job on every node. + * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported + * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + * use `SparkFiles.get(fileName)` to find its download location. + * + */ + def deleteFile(path: String): Unit = { + val uri = new URI(path) + val schemeCorrectedPath = uri.getScheme match { + case null | "local" => new File(path).getCanonicalFile.toURI.toString + case _ => path + } + val scheme = new URI(schemeCorrectedPath).getScheme + val fileName = new File(uri.getPath) + val key = if (!isLocal && scheme == "file") { + env.rpcEnv.fileServer.deleteFile(fileName.getName()) + } else { + schemeCorrectedPath + } + addedFiles.remove(key) + val timestamp = System.currentTimeMillis + logInfo("Deleted file " + path + " at " + key + " with timestamp " + timestamp) + postEnvironmentUpdate() + } + /** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala index 56683771335a6..49d090659559c 100644 --- a/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala @@ -154,6 +154,14 @@ private[spark] trait RpcEnvFileServer { */ def addFile(file: File): String + /** + * Deletes a file to be served by this RpcEnv. + * + * @param file Local file to serve. + * @return A URI for the location of the file. + */ + def deleteFile(file: String): String + /** * Adds a jar to be served by this RpcEnv. Similar to `addFile` but for jars added using * `SparkContext.addJar`. diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala index afcb023a99daa..d3a2bdd36cca2 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyStreamManager.scala @@ -71,6 +71,11 @@ private[netty] class NettyStreamManager(rpcEnv: NettyRpcEnv) s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file.getName())}" } + override def deleteFile(file: String): String = { + files.remove(file) + s"${rpcEnv.address.toSparkURL}/files/${Utils.encodeFileNameToURIRawPath(file)}" + } + override def addJar(file: File): String = { require(jars.putIfAbsent(file.getName(), file) == null, s"JAR ${file.getName()} already registered.") diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index ae665138b98d0..c2a62faa6e183 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -108,7 +108,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { assert(byteArray2.length === 0) } - test("basic case for addFile and listFiles") { + test("basic case for addFile, deleteFile and listFiles") { val dir = Utils.createTempDir() val file1 = File.createTempFile("someprefix1", "somesuffix1", dir) @@ -157,6 +157,10 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext { x }).count() assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 1) + sc.deleteFile(file1.getAbsolutePath) + assert(sc.listFiles().filter(_.contains("somesuffix1")).size == 0) + sc.deleteFile(relativePath) + assert(sc.listFiles().filter(_.contains("somesuffix2")).size == 0) } finally { sc.stop() } diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 index b0e71c7e7c7d1..74293ef521871 100644 --- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 +++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 @@ -116,7 +116,7 @@ statement | LOAD DATA LOCAL? INPATH path=STRING OVERWRITE? INTO TABLE tableIdentifier partitionSpec? #loadData | TRUNCATE TABLE tableIdentifier partitionSpec? #truncateTable - | op=(ADD | LIST) identifier .*? #manageResource + | op=(ADD | DELETE | LIST) identifier .*? #manageResource | SET ROLE .*? #failNativeCommand | SET .*? #setConfiguration | RESET #resetConfiguration diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala index 8ffc55668ae90..9cb87650ed043 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala @@ -819,6 +819,11 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder { case "jar" => AddJarCommand(mayebePaths) case other => throw operationNotAllowed(s"ADD with resource type '$other'", ctx) } + case SqlBaseParser.DELETE => + ctx.identifier.getText.toLowerCase match { + case "file" => DeleteFileCommand(mayebePaths) + case other => throw operationNotAllowed (s"DELETE with resource type '$other'", ctx) + } case SqlBaseParser.LIST => ctx.identifier.getText.toLowerCase match { case "files" | "file" => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala index 20b08946675d0..cfdd13306d5a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/resources.scala @@ -99,3 +99,13 @@ case class ListJarsCommand(jars: Seq[String] = Seq.empty[String]) extends Runnab } } } + +/** + * Deletes a file to the current session. + */ +case class DeleteFileCommand(path: String) extends RunnableCommand { + override def run(sparkSession: SparkSession): Seq[Row] = { + sparkSession.sparkContext.deleteFile(path) + Seq.empty[Row] + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index b20d2f5f6b086..a02e48d849ebf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -1505,23 +1505,4 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { getMessage() assert(e1.startsWith("Path does not exist")) } - - test("SPARK-12987: drop column ") { - val df = Seq((1, 2)).toDF("a_b", "a.c") - val df1 = df.drop("a_b") - checkAnswer(df1, Row(2)) - assert(df1.schema.map(_.name) === Seq("a.c")) - } - - test("SPARK-14759: drop column ") { - val df1 = sqlContext.createDataFrame(Seq((1, 2), (3, 4))).toDF("any", "hour") - val df2 = sqlContext.createDataFrame(Seq((1, 3))).toDF("any").withColumn("hour", lit(10)) - val j = df1.join(df2, $"df1.hour" === $"df2.hour", "left") - assert(j.schema.map(_.name) === Seq("any","hour","any","hour")) - print("Columns after join:{0}".format(j.columns)) - val jj = j.drop($"df2.hour") - assert(jj.schema.map(_.name) === Seq("any")) - print("Columns after drop 'hour':{0}".format(jj.columns)) - } - } diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 75535cad1b18e..a2884ff8a8cea 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -272,4 +272,15 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { s"LIST FILE $dataFilePath;" -> "small_kv.txt" ) } + + test("delete files") { + val dataFilePath = Thread.currentThread(). + getContextClassLoader.getResource("data/files/small_kv.txt") + runCliWithin(2.minute)( + s"ADD FILE $dataFilePath;" -> "", + s"LIST FILES;" -> "small_kv.txt", + s"DELETE FILE $dataFilePath;" -> "", + s"LIST FILES;" -> "" + ) + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index e0f6ccf04dd33..42f28558c2202 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -911,6 +911,11 @@ class HiveQuerySuite extends HiveComparisonTest with BeforeAndAfter { assert(sql("list file"). filter(_.getString(0).contains("data/files/v1.txt")).count() > 0) assert(sql(s"list file $testFile").count() == 1) + + sql(s"DELETE FILE $testFile") + assert(sql("list files").filter(_.getString(0).contains("data/files/v1.txt")).count() == 0) + assert(sql("list file").filter(_.getString(0).contains("data/files/v1.txt")).count() == 0) + assert(sql(s"list file $testFile").count() == 0) } createQueryTest("dynamic_partition", From 876757092e3b8685b4e55236698c22f4d42f9780 Mon Sep 17 00:00:00 2001 From: Kevin Yu Date: Mon, 6 Jun 2016 15:30:24 -0700 Subject: [PATCH 3/3] fix comments for deleteFile --- core/src/main/scala/org/apache/spark/SparkContext.scala | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ab388b5879735..977ea1a0d855e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1441,11 +1441,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli } /** - * Delete a file to be downloaded with this Spark job on every node. - * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported - * filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, - * use `SparkFiles.get(fileName)` to find its download location. - * + * Delete a file based on the given path. This file should have been added by prior call + * 'ADD FILE' command. */ def deleteFile(path: String): Unit = { val uri = new URI(path)