From 5a42d832e1a523f2b9048ee580f3845689e1e620 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 16 Feb 2015 17:59:42 +0800 Subject: [PATCH 1/7] JSONRelation CTAS should check if delete is successful --- .../org/apache/spark/sql/json/JSONRelation.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 24848634de9cf..624e26720d3e1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -66,9 +66,17 @@ private[sql] class DefaultSource mode match { case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") - case SaveMode.Overwrite => - fs.delete(filesystemPath, true) + case SaveMode.Overwrite => { + try { + fs.delete(filesystemPath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to CREATE a JSON table AS SELECT:\n${e.toString}") + } true + } case SaveMode.ErrorIfExists => sys.error(s"path $path already exists.") case SaveMode.Ignore => false From e4bc2290e3a0c5dc1fe0073993cbba1f56e7e050 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 16 Mar 2015 18:14:22 +0800 Subject: [PATCH 2/7] Update JSONRelation.scala --- .../apache/spark/sql/json/JSONRelation.scala | 22 +++++++------------ 1 file changed, 8 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 624e26720d3e1..9dd251ef4f79f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -67,13 +67,10 @@ private[sql] class DefaultSource case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => { - try { - fs.delete(filesystemPath, true) - } catch { - case e: IOException => - throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to CREATE a JSON table AS SELECT:\n${e.toString}") + if (!fs.delete(filesystemPath, true)) { + sys.error( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to CREATE a JSON table AS SELECT") } true } @@ -117,13 +114,10 @@ private[sql] case class JSONRelation( val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) if (overwrite) { - try { - fs.delete(filesystemPath, true) - } catch { - case e: IOException => - throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + if (!fs.delete(filesystemPath, true)) { + sys.error( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to CREATE a JSON table AS SELECT") } // Write the data. data.toJSON.saveAsTextFile(path) From 79f70407d78db3df93641f82c85df14ce00c2205 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 16 Mar 2015 19:04:10 +0800 Subject: [PATCH 3/7] Update JSONRelation.scala --- .../apache/spark/sql/json/JSONRelation.scala | 22 ++++++++++++------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 9dd251ef4f79f..b342b4ac3ae02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -67,10 +67,13 @@ private[sql] class DefaultSource case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => { - if (!fs.delete(filesystemPath, true)) { - sys.error( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to CREATE a JSON table AS SELECT") + try { + fs.delete(filesystemPath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") } true } @@ -114,10 +117,13 @@ private[sql] case class JSONRelation( val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) if (overwrite) { - if (!fs.delete(filesystemPath, true)) { - sys.error( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to CREATE a JSON table AS SELECT") + try { + fs.delete(filesystemPath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") } // Write the data. data.toJSON.saveAsTextFile(path) From e2df8d5438dfe0b945bed76ec8ac8e632ac7a9a5 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Mon, 16 Mar 2015 19:12:21 +0800 Subject: [PATCH 4/7] check path exisit when write --- .../main/scala/org/apache/spark/sql/json/JSONRelation.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index b342b4ac3ae02..eadac64a98c94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -86,6 +86,9 @@ private[sql] class DefaultSource } if (doSave) { // Only save data when the save mode is not ignore. + if (fs.exists(filesystemPath)) { + sys.error(s"Unable to INSERT OVERWRITE a JSON table, because table path $path exists.") + } data.toJSON.saveAsTextFile(path) } @@ -126,6 +129,9 @@ private[sql] case class JSONRelation( + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") } // Write the data. + if (fs.exists(filesystemPath)) { + sys.error(s"Unable to INSERT OVERWRITE a JSON table, because table path $path exists.") + } data.toJSON.saveAsTextFile(path) // Right now, we assume that the schema is not changed. We will not update the schema. // schema = data.schema From 46f0d9d7e58cbc8f118e2d528d9241f8497bc73a Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Wed, 18 Mar 2015 15:05:19 +0800 Subject: [PATCH 5/7] Update JSONRelation.scala --- .../apache/spark/sql/json/JSONRelation.scala | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index eadac64a98c94..c086cb4615f94 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -68,7 +68,11 @@ private[sql] class DefaultSource sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => { try { - fs.delete(filesystemPath, true) + if (!fs.delete(filesystemPath, true)) { + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to INSERT OVERWRITE a JSON table:\n") + } } catch { case e: IOException => throw new IOException( @@ -86,9 +90,6 @@ private[sql] class DefaultSource } if (doSave) { // Only save data when the save mode is not ignore. - if (fs.exists(filesystemPath)) { - sys.error(s"Unable to INSERT OVERWRITE a JSON table, because table path $path exists.") - } data.toJSON.saveAsTextFile(path) } @@ -121,7 +122,11 @@ private[sql] case class JSONRelation( if (overwrite) { try { - fs.delete(filesystemPath, true) + if (fs.exists(filesystemPath) && !fs.delete(filesystemPath, true)) { + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to INSERT OVERWRITE a JSON table:\n") + } } catch { case e: IOException => throw new IOException( @@ -129,9 +134,6 @@ private[sql] case class JSONRelation( + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") } // Write the data. - if (fs.exists(filesystemPath)) { - sys.error(s"Unable to INSERT OVERWRITE a JSON table, because table path $path exists.") - } data.toJSON.saveAsTextFile(path) // Right now, we assume that the schema is not changed. We will not update the schema. // schema = data.schema From 42d7fb6ec005d94aaece32883adcac269c739786 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 19 Mar 2015 15:37:13 +0800 Subject: [PATCH 6/7] add unittest & fix output format --- .../apache/spark/sql/json/JSONRelation.scala | 32 +++++++++++-------- .../sources/CreateTableAsSelectSuite.scala | 25 ++++++++++++++- 2 files changed, 42 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index c086cb4615f94..b5c6c0d3b79df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -67,18 +67,19 @@ private[sql] class DefaultSource case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => { + var success: Boolean = false try { - if (!fs.delete(filesystemPath, true)) { - throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n") - } + success = fs.delete(filesystemPath, true) } catch { case e: IOException => throw new IOException( s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + + s" to writing to JSON table:\n${e.toString}") } + if (!success) + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to writing to JSON table.") true } case SaveMode.ErrorIfExists => @@ -121,17 +122,20 @@ private[sql] case class JSONRelation( val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) if (overwrite) { - try { - if (fs.exists(filesystemPath) && !fs.delete(filesystemPath, true)) { - throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n") + if (fs.exists(filesystemPath)) { + var success: Boolean = false + try { + success = fs.delete(filesystemPath, true) + } catch { + case e: IOException => + throw new IOException( + s"Unable to clear output directory ${filesystemPath.toString} prior" + + s" to writing to JSON table:\n${e.toString}") } - } catch { - case e: IOException => + if (!success) throw new IOException( s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + + s" to writing to JSON table.") } // Write the data. data.toJSON.saveAsTextFile(path) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 29caed9337ff6..b65fae31b761f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.sources -import java.io.File +import java.io.{IOException, File} import org.scalatest.BeforeAndAfterAll @@ -62,6 +62,29 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { dropTempTable("jsonTable") } + test("CREATE TEMPORARY TABLE AS SELECT based on the file without write permission") { + val childPath = new File(path.toString, "child") + path.mkdir() + childPath.createNewFile() + path.setWritable(false) + + val e = intercept[IOException] { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${path.toString}' + |) AS + |SELECT a, b FROM jt + """.stripMargin) + sql("SELECT a, b FROM jsonTable").collect() + } + assert(e.getMessage().contains("Unable to clear output directory")) + + path.setWritable(true) + } + test("create a table, drop it and create another one with the same name") { sql( s""" From c387fcef43ed45bc6469902216c57b9937ae5a1d Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Thu, 19 Mar 2015 15:42:40 +0800 Subject: [PATCH 7/7] fix typos --- .../main/scala/org/apache/spark/sql/json/JSONRelation.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index b5c6c0d3b79df..62c99d6559964 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -76,10 +76,11 @@ private[sql] class DefaultSource s"Unable to clear output directory ${filesystemPath.toString} prior" + s" to writing to JSON table:\n${e.toString}") } - if (!success) + if (!success) { throw new IOException( s"Unable to clear output directory ${filesystemPath.toString} prior" + s" to writing to JSON table.") + } true } case SaveMode.ErrorIfExists => @@ -132,10 +133,11 @@ private[sql] case class JSONRelation( s"Unable to clear output directory ${filesystemPath.toString} prior" + s" to writing to JSON table:\n${e.toString}") } - if (!success) + if (!success) { throw new IOException( s"Unable to clear output directory ${filesystemPath.toString} prior" + s" to writing to JSON table.") + } } // Write the data. data.toJSON.saveAsTextFile(path)