Skip to content

Commit 41307cd

Browse files
author
Yanbo Liang
committed
baseRDD based file should be considered separately for scan and insert
1 parent c46d08c commit 41307cd

File tree

1 file changed

+165
-0
lines changed

1 file changed

+165
-0
lines changed
Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,165 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.json
19+
20+
import java.io.IOException
21+
22+
import org.apache.hadoop.fs.Path
23+
24+
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
25+
import org.apache.spark.sql.sources._
26+
import org.apache.spark.sql.types.StructType
27+
28+
29+
private[sql] class DefaultSource
30+
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
31+
32+
private def checkPath(parameters: Map[String, String]): String = {
33+
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
34+
}
35+
36+
/** Returns a new base relation with the parameters. */
37+
override def createRelation(
38+
sqlContext: SQLContext,
39+
parameters: Map[String, String]): BaseRelation = {
40+
val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
41+
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
42+
43+
JSONRelation(path, samplingRatio, None)(sqlContext)
44+
}
45+
46+
/** Returns a new base relation with the given schema and parameters. */
47+
override def createRelation(
48+
sqlContext: SQLContext,
49+
parameters: Map[String, String],
50+
schema: StructType): BaseRelation = {
51+
val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
52+
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
53+
54+
JSONRelation(path, samplingRatio, Some(schema))(sqlContext)
55+
}
56+
57+
override def createRelation(
58+
sqlContext: SQLContext,
59+
mode: SaveMode,
60+
parameters: Map[String, String],
61+
data: DataFrame): BaseRelation = {
62+
val path = checkPath(parameters)
63+
val filesystemPath = new Path(path)
64+
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
65+
val doSave = if (fs.exists(filesystemPath)) {
66+
mode match {
67+
case SaveMode.Append =>
68+
sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
69+
case SaveMode.Overwrite =>
70+
//fs.delete(filesystemPath, true)
71+
true
72+
case SaveMode.ErrorIfExists =>
73+
sys.error(s"path $path already exists.")
74+
case SaveMode.Ignore => false
75+
}
76+
} else {
77+
true
78+
}
79+
val relation = if (doSave) {
80+
// Only save data when the save mode is not ignore.
81+
//data.toJSON.saveAsTextFile(path)
82+
val createdRelation = createRelation(sqlContext,parameters, data.schema)
83+
createdRelation.asInstanceOf[JSONRelation].insert(data, true)
84+
85+
createdRelation
86+
} else {
87+
createRelation(sqlContext, parameters, data.schema)
88+
}
89+
90+
relation
91+
}
92+
}
93+
94+
private[sql] case class JSONRelation(
95+
path: String,
96+
samplingRatio: Double,
97+
userSpecifiedSchema: Option[StructType])(
98+
@transient val sqlContext: SQLContext)
99+
extends TableScan with InsertableRelation {
100+
// TODO: Support partitioned JSON relation.
101+
val filesystemPath = new Path(path)
102+
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
103+
// TableScan can support base on ordinary file, but InsertableRelation only base on directory.
104+
val newPath = if (fs.exists(filesystemPath) && fs.getFileStatus(filesystemPath).isFile()) {
105+
filesystemPath
106+
} else {
107+
new Path(filesystemPath.toUri.toString,"*")
108+
}
109+
private def baseRDD = sqlContext.sparkContext.textFile(newPath.toUri.toString)
110+
111+
override val schema = userSpecifiedSchema.getOrElse(
112+
JsonRDD.nullTypeToStringType(
113+
JsonRDD.inferSchema(
114+
baseRDD,
115+
samplingRatio,
116+
sqlContext.conf.columnNameOfCorruptRecord)))
117+
118+
override def buildScan() =
119+
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
120+
121+
private def isTemporaryFile(file: Path): Boolean = {
122+
file.getName == "_temporary"
123+
}
124+
125+
override def insert(data: DataFrame, overwrite: Boolean) = {
126+
127+
// If the path exists, it must be a directory, error for not.
128+
// Otherwise we create a directory with the path name.
129+
if (fs.exists(filesystemPath) && !fs.getFileStatus(filesystemPath).isDirectory) {
130+
sys.error("a CREATE [TEMPORARY] TABLE AS SELECT statement need the path must be directory")
131+
}
132+
133+
if (overwrite) {
134+
val temporaryPath = new Path(path, "_temporary")
135+
val dataPath = new Path(path, "data")
136+
// Write the data.
137+
data.toJSON.saveAsTextFile(temporaryPath.toUri.toString)
138+
val pathsToDelete = fs.listStatus(filesystemPath).filter(
139+
f => !isTemporaryFile(f.getPath)).map(_.getPath)
140+
141+
try {
142+
pathsToDelete.foreach(fs.delete(_,true))
143+
} catch {
144+
case e: IOException =>
145+
throw new IOException(
146+
s"Unable to delete original data in directory ${filesystemPath.toString} when"
147+
+ s" run INSERT OVERWRITE a JSON table:\n${e.toString}")
148+
}
149+
fs.rename(temporaryPath,dataPath)
150+
// Right now, we assume that the schema is not changed. We will not update the schema.
151+
// schema = data.schema
152+
} else {
153+
// TODO: Support INSERT INTO
154+
sys.error("JSON table only support INSERT OVERWRITE for now.")
155+
}
156+
}
157+
158+
override def hashCode(): Int = 41 * (41 + path.hashCode) + schema.hashCode()
159+
160+
override def equals(other: Any): Boolean = other match {
161+
case that: JSONRelation =>
162+
(this.path == that.path) && (this.schema == that.schema)
163+
case _ => false
164+
}
165+
}

0 commit comments

Comments
 (0)