From 8683a483c074f692152159d63a101f78c3c3fe58 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 01:37:05 +0800 Subject: [PATCH 1/8] JSON data source refactor initial draft --- .../apache/spark/sql/json/JSONRelation.scala | 44 ++++++++++++++----- .../sources/CreateTableAsSelectSuite.scala | 31 +++++++++++++ 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 24848634de9c..f06dc234439f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -67,7 +67,7 @@ private[sql] class DefaultSource case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => - fs.delete(filesystemPath, true) + //fs.delete(filesystemPath, true) true case SaveMode.ErrorIfExists => sys.error(s"path $path already exists.") @@ -76,12 +76,18 @@ private[sql] class DefaultSource } else { true } - if (doSave) { + val relation = if (doSave) { // Only save data when the save mode is not ignore. - data.toJSON.saveAsTextFile(path) + //data.toJSON.saveAsTextFile(path) + val createdRelation = createRelation(sqlContext,parameters, data.schema) + createdRelation.asInstanceOf[JSONRelation].insert(data, true) + + createdRelation + } else { + createRelation(sqlContext, parameters, data.schema) } - createRelation(sqlContext, parameters, data.schema) + relation } } @@ -92,7 +98,8 @@ private[sql] case class JSONRelation( @transient val sqlContext: SQLContext) extends TableScan with InsertableRelation { // TODO: Support partitioned JSON relation. - private def baseRDD = sqlContext.sparkContext.textFile(path) + val filePath = new Path(path,"*").toUri.toString + private def baseRDD = sqlContext.sparkContext.textFile(filePath) override val schema = userSpecifiedSchema.getOrElse( JsonRDD.nullTypeToStringType( @@ -104,21 +111,38 @@ private[sql] case class JSONRelation( override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) + private def isTemporaryFile(file: Path): Boolean = { + file.getName == "_temporary" + } + override def insert(data: DataFrame, overwrite: Boolean) = { + val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + // If the path exists, it must be a directory. + // Otherwise we create a directory with the path name. + if (fs.exists(filesystemPath) && !fs.getFileStatus(filesystemPath).isDirectory) { + sys.error("a CREATE [TEMPORARY] TABLE AS SELECT statement need the path must be directory") + } + if (overwrite) { + val temporaryPath = new Path(path, "_temporary") + val dataPath = new Path(path, "data") + // Write the data. + data.toJSON.saveAsTextFile(temporaryPath.toUri.toString) + val pathsToDelete = fs.listStatus(filesystemPath).filter( + f => !isTemporaryFile(f.getPath)).map(_.getPath) + try { - fs.delete(filesystemPath, true) + pathsToDelete.foreach(fs.delete(_,true)) } catch { case e: IOException => throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + s"Unable to delete original data in directory ${filesystemPath.toString} when" + + s" run INSERT OVERWRITE a JSON table:\n${e.toString}") } - // Write the data. - data.toJSON.saveAsTextFile(path) + fs.rename(temporaryPath,dataPath) // Right now, we assume that the schema is not changed. We will not update the schema. // schema = data.schema } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 29caed9337ff..6e58d8155a35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -29,10 +29,13 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { import caseInsensisitiveContext._ var path: File = null + var existPath: File = null override def beforeAll(): Unit = { path = util.getTempFilePath("jsonCTAS").getCanonicalFile + existPath = util.getTempFilePath("existJsonCTAS").getCanonicalFile val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + rdd.saveAsTextFile(existPath.toURI.toString) jsonRDD(rdd).registerTempTable("jt") } @@ -62,6 +65,34 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { dropTempTable("jsonTable") } + test("INSERT OVERWRITE with the source and destination point to the same table") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable1 + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${existPath.toString}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable2 + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${existPath.toString}' + |) AS + |SELECT a, b FROM jsonTable1 + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable2"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable1") + dropTempTable("jsonTable2") + } + test("create a table, drop it and create another one with the same name") { sql( s""" From 0812dd1c269e2d2c57cce817a1c3ecee46e59c5b Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 02:04:06 +0800 Subject: [PATCH 2/8] Remove useless annotation --- .../src/main/scala/org/apache/spark/sql/json/JSONRelation.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index f06dc234439f..a823640059d0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -67,7 +67,6 @@ private[sql] class DefaultSource case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => - //fs.delete(filesystemPath, true) true case SaveMode.ErrorIfExists => sys.error(s"path $path already exists.") From 29e138a311208e75a6b94c109a0ba7df408181d1 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 02:16:35 +0800 Subject: [PATCH 3/8] Remove useless annotation --- .../src/main/scala/org/apache/spark/sql/json/JSONRelation.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index a823640059d0..17169e110f39 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -77,7 +77,6 @@ private[sql] class DefaultSource } val relation = if (doSave) { // Only save data when the save mode is not ignore. - //data.toJSON.saveAsTextFile(path) val createdRelation = createRelation(sqlContext,parameters, data.schema) createdRelation.asInstanceOf[JSONRelation].insert(data, true) From f80e2629bb74bc62960c61ff313f7e7802d61319 Mon Sep 17 00:00:00 2001 From: gasparms Date: Sat, 14 Feb 2015 20:10:29 +0000 Subject: [PATCH 4/8] [SPARK-5800] Streaming Docs. Change linked files according the selected language Currently, Spark Streaming Programming Guide after updateStateByKey explanation links to file stateful_network_wordcount.py and note "For the complete Scala code ..." for any language tab selected. This is an incoherence. I've changed the guide and link its pertinent example file. JavaStatefulNetworkWordCount.java example was not created so I added to the commit. Author: gasparms Closes #4589 from gasparms/feature/streaming-guide and squashes the following commits: 7f37f89 [gasparms] More style changes ec202b0 [gasparms] Follow spark style guide f527328 [gasparms] Improve example to look like scala example 4d8785c [gasparms] Remove throw exception e92e6b8 [gasparms] Fix incoherence 92db405 [gasparms] Fix Streaming Programming Guide. Change files according the selected language --- docs/streaming-programming-guide.md | 21 +++- .../JavaStatefulNetworkWordCount.java | 115 ++++++++++++++++++ 2 files changed, 132 insertions(+), 4 deletions(-) create mode 100644 examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java diff --git a/docs/streaming-programming-guide.md b/docs/streaming-programming-guide.md index 96fb12ce5e0b..997de9511ca3 100644 --- a/docs/streaming-programming-guide.md +++ b/docs/streaming-programming-guide.md @@ -878,6 +878,12 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi val runningCounts = pairs.updateStateByKey[Int](updateFunction _) {% endhighlight %} +The update function will be called for each word, with `newValues` having a sequence of 1's (from +the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete +Scala code, take a look at the example +[StatefulNetworkWordCount.scala]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/scala/org/apache +/spark/examples/streaming/StatefulNetworkWordCount.scala). +
@@ -899,6 +905,13 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi JavaPairDStream runningCounts = pairs.updateStateByKey(updateFunction); {% endhighlight %} +The update function will be called for each word, with `newValues` having a sequence of 1's (from +the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete +Java code, take a look at the example +[JavaStatefulNetworkWordCount.java]({{site +.SPARK_GITHUB_URL}}/blob/master/examples/src/main/java/org/apache/spark/examples/streaming +/JavaStatefulNetworkWordCount.java). +
@@ -916,14 +929,14 @@ This is applied on a DStream containing words (say, the `pairs` DStream containi runningCounts = pairs.updateStateByKey(updateFunction) {% endhighlight %} -
- - The update function will be called for each word, with `newValues` having a sequence of 1's (from the `(word, 1)` pairs) and the `runningCount` having the previous count. For the complete -Scala code, take a look at the example +Python code, take a look at the example [stateful_network_wordcount.py]({{site.SPARK_GITHUB_URL}}/blob/master/examples/src/main/python/streaming/stateful_network_wordcount.py). + + + Note that using `updateStateByKey` requires the checkpoint directory to be configured, which is discussed in detail in the [checkpointing](#checkpointing) section. diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java new file mode 100644 index 000000000000..09491fe30082 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.streaming; + +import java.util.Arrays; +import java.util.List; +import java.util.regex.Pattern; + +import scala.Tuple2; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; + +import org.apache.spark.HashPartitioner; +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaPairRDD; +import org.apache.spark.api.java.StorageLevels; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function2; +import org.apache.spark.api.java.function.PairFunction; +import org.apache.spark.streaming.Durations; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; +import org.apache.spark.streaming.api.java.JavaStreamingContext; + + +/** + * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every + * second starting with initial value of word count. + * Usage: JavaStatefulNetworkWordCount + * and describe the TCP server that Spark Streaming would connect to receive + * data. + *

+ * To run this on your local machine, you need to first run a Netcat server + * `$ nc -lk 9999` + * and then run the example + * `$ bin/run-example + * org.apache.spark.examples.streaming.JavaStatefulNetworkWordCount localhost 9999` + */ +public class JavaStatefulNetworkWordCount { + private static final Pattern SPACE = Pattern.compile(" "); + + public static void main(String[] args) { + if (args.length < 2) { + System.err.println("Usage: JavaStatefulNetworkWordCount "); + System.exit(1); + } + + StreamingExamples.setStreamingLogLevels(); + + // Update the cumulative count function + final Function2, Optional, Optional> updateFunction = new + Function2, Optional, Optional>() { + @Override + public Optional call(List values, Optional state) { + Integer newSum = state.or(0); + for (Integer value : values) { + newSum += value; + } + return Optional.of(newSum); + } + }; + + // Create the context with a 1 second batch size + SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); + JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1)); + ssc.checkpoint("."); + + // Initial RDD input to updateStateByKey + List> tuples = Arrays.asList(new Tuple2("hello", 1), + new Tuple2("world", 1)); + JavaPairRDD initialRDD = ssc.sc().parallelizePairs(tuples); + + JavaReceiverInputDStream lines = ssc.socketTextStream( + args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER_2); + + JavaDStream words = lines.flatMap(new FlatMapFunction() { + @Override + public Iterable call(String x) { + return Lists.newArrayList(SPACE.split(x)); + } + }); + + JavaPairDStream wordsDstream = words.mapToPair(new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }); + + // This will give a Dstream made of state (which is the cumulative count of the words) + JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, + new HashPartitioner(ssc.sc().defaultParallelism()), initialRDD); + + stateDstream.print(); + ssc.start(); + ssc.awaitTermination(); + } +} From 15a2ab5f89d56e67c84e7163d28d93e72583393c Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sat, 14 Feb 2015 20:12:29 +0000 Subject: [PATCH 5/8] Revise formatting of previous commit f80e2629bb74bc62960c61ff313f7e7802d61319 --- .../JavaStatefulNetworkWordCount.java | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java index 09491fe30082..d46c7107c7a2 100644 --- a/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java +++ b/examples/src/main/java/org/apache/spark/examples/streaming/JavaStatefulNetworkWordCount.java @@ -39,7 +39,6 @@ import org.apache.spark.streaming.api.java.JavaReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; - /** * Counts words cumulatively in UTF8 encoded, '\n' delimited text received from the network every * second starting with initial value of word count. @@ -65,17 +64,17 @@ public static void main(String[] args) { StreamingExamples.setStreamingLogLevels(); // Update the cumulative count function - final Function2, Optional, Optional> updateFunction = new - Function2, Optional, Optional>() { - @Override - public Optional call(List values, Optional state) { - Integer newSum = state.or(0); - for (Integer value : values) { - newSum += value; - } - return Optional.of(newSum); - } - }; + final Function2, Optional, Optional> updateFunction = + new Function2, Optional, Optional>() { + @Override + public Optional call(List values, Optional state) { + Integer newSum = state.or(0); + for (Integer value : values) { + newSum += value; + } + return Optional.of(newSum); + } + }; // Create the context with a 1 second batch size SparkConf sparkConf = new SparkConf().setAppName("JavaStatefulNetworkWordCount"); @@ -97,12 +96,13 @@ public Iterable call(String x) { } }); - JavaPairDStream wordsDstream = words.mapToPair(new PairFunction() { - @Override - public Tuple2 call(String s) { - return new Tuple2(s, 1); - } - }); + JavaPairDStream wordsDstream = words.mapToPair( + new PairFunction() { + @Override + public Tuple2 call(String s) { + return new Tuple2(s, 1); + } + }); // This will give a Dstream made of state (which is the cumulative count of the words) JavaPairDStream stateDstream = wordsDstream.updateStateByKey(updateFunction, From ed5f4bb7cb2c934b818d1e8b8b4e6a0056119c80 Mon Sep 17 00:00:00 2001 From: gli Date: Sat, 14 Feb 2015 20:43:27 +0000 Subject: [PATCH 6/8] SPARK-5822 [BUILD] cannot import src/main/scala & src/test/scala into eclipse as source folder When import the whole project into eclipse as maven project, found that the src/main/scala & src/test/scala can not be set as source folder as default behavior, so add a "add-source" goal in scala-maven-plugin to let this work. Author: gli Closes #4531 from ligangty/addsource and squashes the following commits: 4e4db4c [gli] [IDE] cannot import src/main/scala & src/test/scala into eclipse as source folder --- pom.xml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/pom.xml b/pom.xml index 53372d5cfc62..6810d71be423 100644 --- a/pom.xml +++ b/pom.xml @@ -1083,6 +1083,12 @@ scala-maven-plugin 3.2.0 + + eclipse-add-source + + add-source + + scala-compile-first process-resources From d1d4ed13eda20655836bcadd116a2553ae121d58 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 01:37:05 +0800 Subject: [PATCH 7/8] JSON data source refactor initial draft --- .../apache/spark/sql/json/JSONRelation.scala | 44 ++++++++++++++----- .../sources/CreateTableAsSelectSuite.scala | 31 +++++++++++++ 2 files changed, 65 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index 24848634de9c..f06dc234439f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -67,7 +67,7 @@ private[sql] class DefaultSource case SaveMode.Append => sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") case SaveMode.Overwrite => - fs.delete(filesystemPath, true) + //fs.delete(filesystemPath, true) true case SaveMode.ErrorIfExists => sys.error(s"path $path already exists.") @@ -76,12 +76,18 @@ private[sql] class DefaultSource } else { true } - if (doSave) { + val relation = if (doSave) { // Only save data when the save mode is not ignore. - data.toJSON.saveAsTextFile(path) + //data.toJSON.saveAsTextFile(path) + val createdRelation = createRelation(sqlContext,parameters, data.schema) + createdRelation.asInstanceOf[JSONRelation].insert(data, true) + + createdRelation + } else { + createRelation(sqlContext, parameters, data.schema) } - createRelation(sqlContext, parameters, data.schema) + relation } } @@ -92,7 +98,8 @@ private[sql] case class JSONRelation( @transient val sqlContext: SQLContext) extends TableScan with InsertableRelation { // TODO: Support partitioned JSON relation. - private def baseRDD = sqlContext.sparkContext.textFile(path) + val filePath = new Path(path,"*").toUri.toString + private def baseRDD = sqlContext.sparkContext.textFile(filePath) override val schema = userSpecifiedSchema.getOrElse( JsonRDD.nullTypeToStringType( @@ -104,21 +111,38 @@ private[sql] case class JSONRelation( override def buildScan() = JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) + private def isTemporaryFile(file: Path): Boolean = { + file.getName == "_temporary" + } + override def insert(data: DataFrame, overwrite: Boolean) = { + val filesystemPath = new Path(path) val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + // If the path exists, it must be a directory. + // Otherwise we create a directory with the path name. + if (fs.exists(filesystemPath) && !fs.getFileStatus(filesystemPath).isDirectory) { + sys.error("a CREATE [TEMPORARY] TABLE AS SELECT statement need the path must be directory") + } + if (overwrite) { + val temporaryPath = new Path(path, "_temporary") + val dataPath = new Path(path, "data") + // Write the data. + data.toJSON.saveAsTextFile(temporaryPath.toUri.toString) + val pathsToDelete = fs.listStatus(filesystemPath).filter( + f => !isTemporaryFile(f.getPath)).map(_.getPath) + try { - fs.delete(filesystemPath, true) + pathsToDelete.foreach(fs.delete(_,true)) } catch { case e: IOException => throw new IOException( - s"Unable to clear output directory ${filesystemPath.toString} prior" - + s" to INSERT OVERWRITE a JSON table:\n${e.toString}") + s"Unable to delete original data in directory ${filesystemPath.toString} when" + + s" run INSERT OVERWRITE a JSON table:\n${e.toString}") } - // Write the data. - data.toJSON.saveAsTextFile(path) + fs.rename(temporaryPath,dataPath) // Right now, we assume that the schema is not changed. We will not update the schema. // schema = data.schema } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala index 29caed9337ff..6e58d8155a35 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/CreateTableAsSelectSuite.scala @@ -29,10 +29,13 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { import caseInsensisitiveContext._ var path: File = null + var existPath: File = null override def beforeAll(): Unit = { path = util.getTempFilePath("jsonCTAS").getCanonicalFile + existPath = util.getTempFilePath("existJsonCTAS").getCanonicalFile val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, "b":"str${i}"}""")) + rdd.saveAsTextFile(existPath.toURI.toString) jsonRDD(rdd).registerTempTable("jt") } @@ -62,6 +65,34 @@ class CreateTableAsSelectSuite extends DataSourceTest with BeforeAndAfterAll { dropTempTable("jsonTable") } + test("INSERT OVERWRITE with the source and destination point to the same table") { + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable1 + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${existPath.toString}' + |) + """.stripMargin) + + sql( + s""" + |CREATE TEMPORARY TABLE jsonTable2 + |USING org.apache.spark.sql.json.DefaultSource + |OPTIONS ( + | path '${existPath.toString}' + |) AS + |SELECT a, b FROM jsonTable1 + """.stripMargin) + + checkAnswer( + sql("SELECT a, b FROM jsonTable2"), + sql("SELECT a, b FROM jt").collect()) + + dropTempTable("jsonTable1") + dropTempTable("jsonTable2") + } + test("create a table, drop it and create another one with the same name") { sql( s""" From 41307cd743f5cf27e13b73389ae54d9c1d761718 Mon Sep 17 00:00:00 2001 From: Yanbo Liang Date: Sun, 15 Feb 2015 11:50:37 +0800 Subject: [PATCH 8/8] baseRDD based file should be considered separately for scan and insert --- .../apache/spark/sql/json/JSONRelation.scala | 165 ++++++++++++++++++ 1 file changed, 165 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala index e69de29bb2d1..e63042ccac7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.json + +import java.io.IOException + +import org.apache.hadoop.fs.Path + +import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext} +import org.apache.spark.sql.sources._ +import org.apache.spark.sql.types.StructType + + +private[sql] class DefaultSource + extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider { + + private def checkPath(parameters: Map[String, String]): String = { + parameters.getOrElse("path", sys.error("'path' must be specified for json data.")) + } + + /** Returns a new base relation with the parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String]): BaseRelation = { + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) + + JSONRelation(path, samplingRatio, None)(sqlContext) + } + + /** Returns a new base relation with the given schema and parameters. */ + override def createRelation( + sqlContext: SQLContext, + parameters: Map[String, String], + schema: StructType): BaseRelation = { + val path = parameters.getOrElse("path", sys.error("Option 'path' not specified")) + val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0) + + JSONRelation(path, samplingRatio, Some(schema))(sqlContext) + } + + override def createRelation( + sqlContext: SQLContext, + mode: SaveMode, + parameters: Map[String, String], + data: DataFrame): BaseRelation = { + val path = checkPath(parameters) + val filesystemPath = new Path(path) + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + val doSave = if (fs.exists(filesystemPath)) { + mode match { + case SaveMode.Append => + sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}") + case SaveMode.Overwrite => + //fs.delete(filesystemPath, true) + true + case SaveMode.ErrorIfExists => + sys.error(s"path $path already exists.") + case SaveMode.Ignore => false + } + } else { + true + } + val relation = if (doSave) { + // Only save data when the save mode is not ignore. + //data.toJSON.saveAsTextFile(path) + val createdRelation = createRelation(sqlContext,parameters, data.schema) + createdRelation.asInstanceOf[JSONRelation].insert(data, true) + + createdRelation + } else { + createRelation(sqlContext, parameters, data.schema) + } + + relation + } +} + +private[sql] case class JSONRelation( + path: String, + samplingRatio: Double, + userSpecifiedSchema: Option[StructType])( + @transient val sqlContext: SQLContext) + extends TableScan with InsertableRelation { + // TODO: Support partitioned JSON relation. + val filesystemPath = new Path(path) + val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration) + // TableScan can support base on ordinary file, but InsertableRelation only base on directory. + val newPath = if (fs.exists(filesystemPath) && fs.getFileStatus(filesystemPath).isFile()) { + filesystemPath + } else { + new Path(filesystemPath.toUri.toString,"*") + } + private def baseRDD = sqlContext.sparkContext.textFile(newPath.toUri.toString) + + override val schema = userSpecifiedSchema.getOrElse( + JsonRDD.nullTypeToStringType( + JsonRDD.inferSchema( + baseRDD, + samplingRatio, + sqlContext.conf.columnNameOfCorruptRecord))) + + override def buildScan() = + JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord) + + private def isTemporaryFile(file: Path): Boolean = { + file.getName == "_temporary" + } + + override def insert(data: DataFrame, overwrite: Boolean) = { + + // If the path exists, it must be a directory, error for not. + // Otherwise we create a directory with the path name. + if (fs.exists(filesystemPath) && !fs.getFileStatus(filesystemPath).isDirectory) { + sys.error("a CREATE [TEMPORARY] TABLE AS SELECT statement need the path must be directory") + } + + if (overwrite) { + val temporaryPath = new Path(path, "_temporary") + val dataPath = new Path(path, "data") + // Write the data. + data.toJSON.saveAsTextFile(temporaryPath.toUri.toString) + val pathsToDelete = fs.listStatus(filesystemPath).filter( + f => !isTemporaryFile(f.getPath)).map(_.getPath) + + try { + pathsToDelete.foreach(fs.delete(_,true)) + } catch { + case e: IOException => + throw new IOException( + s"Unable to delete original data in directory ${filesystemPath.toString} when" + + s" run INSERT OVERWRITE a JSON table:\n${e.toString}") + } + fs.rename(temporaryPath,dataPath) + // Right now, we assume that the schema is not changed. We will not update the schema. + // schema = data.schema + } else { + // TODO: Support INSERT INTO + sys.error("JSON table only support INSERT OVERWRITE for now.") + } + } + + override def hashCode(): Int = 41 * (41 + path.hashCode) + schema.hashCode() + + override def equals(other: Any): Boolean = other match { + case that: JSONRelation => + (this.path == that.path) && (this.schema == that.schema) + case _ => false + } +}