Skip to content

Commit c46d08c

Browse files
author
Yanbo Liang
committed
Merge branch 'JSONDataSourceRefactor' of github.com:yanbohappy/spark into JSONDataSourceRefactor
Conflicts: sql/core/src/main/scala/org/apache/spark/sql/json/JSONRelation.scala
2 parents d1d4ed1 + 29e138a commit c46d08c

File tree

1 file changed

+0
-161
lines changed

1 file changed

+0
-161
lines changed
Lines changed: 0 additions & 161 deletions
Original file line numberDiff line numberDiff line change
@@ -1,161 +0,0 @@
1-
/*
2-
* Licensed to the Apache Software Foundation (ASF) under one or more
3-
* contributor license agreements. See the NOTICE file distributed with
4-
* this work for additional information regarding copyright ownership.
5-
* The ASF licenses this file to You under the Apache License, Version 2.0
6-
* (the "License"); you may not use this file except in compliance with
7-
* the License. You may obtain a copy of the License at
8-
*
9-
* http://www.apache.org/licenses/LICENSE-2.0
10-
*
11-
* Unless required by applicable law or agreed to in writing, software
12-
* distributed under the License is distributed on an "AS IS" BASIS,
13-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14-
* See the License for the specific language governing permissions and
15-
* limitations under the License.
16-
*/
17-
18-
package org.apache.spark.sql.json
19-
20-
import java.io.IOException
21-
22-
import org.apache.hadoop.fs.Path
23-
24-
import org.apache.spark.sql.{SaveMode, DataFrame, SQLContext}
25-
import org.apache.spark.sql.sources._
26-
import org.apache.spark.sql.types.StructType
27-
28-
29-
private[sql] class DefaultSource
30-
extends RelationProvider with SchemaRelationProvider with CreatableRelationProvider {
31-
32-
private def checkPath(parameters: Map[String, String]): String = {
33-
parameters.getOrElse("path", sys.error("'path' must be specified for json data."))
34-
}
35-
36-
/** Returns a new base relation with the parameters. */
37-
override def createRelation(
38-
sqlContext: SQLContext,
39-
parameters: Map[String, String]): BaseRelation = {
40-
val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
41-
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
42-
43-
JSONRelation(path, samplingRatio, None)(sqlContext)
44-
}
45-
46-
/** Returns a new base relation with the given schema and parameters. */
47-
override def createRelation(
48-
sqlContext: SQLContext,
49-
parameters: Map[String, String],
50-
schema: StructType): BaseRelation = {
51-
val path = parameters.getOrElse("path", sys.error("Option 'path' not specified"))
52-
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
53-
54-
JSONRelation(path, samplingRatio, Some(schema))(sqlContext)
55-
}
56-
57-
override def createRelation(
58-
sqlContext: SQLContext,
59-
mode: SaveMode,
60-
parameters: Map[String, String],
61-
data: DataFrame): BaseRelation = {
62-
val path = checkPath(parameters)
63-
val filesystemPath = new Path(path)
64-
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
65-
val doSave = if (fs.exists(filesystemPath)) {
66-
mode match {
67-
case SaveMode.Append =>
68-
sys.error(s"Append mode is not supported by ${this.getClass.getCanonicalName}")
69-
case SaveMode.Overwrite =>
70-
//fs.delete(filesystemPath, true)
71-
true
72-
case SaveMode.ErrorIfExists =>
73-
sys.error(s"path $path already exists.")
74-
case SaveMode.Ignore => false
75-
}
76-
} else {
77-
true
78-
}
79-
val relation = if (doSave) {
80-
// Only save data when the save mode is not ignore.
81-
//data.toJSON.saveAsTextFile(path)
82-
val createdRelation = createRelation(sqlContext,parameters, data.schema)
83-
createdRelation.asInstanceOf[JSONRelation].insert(data, true)
84-
85-
createdRelation
86-
} else {
87-
createRelation(sqlContext, parameters, data.schema)
88-
}
89-
90-
relation
91-
}
92-
}
93-
94-
private[sql] case class JSONRelation(
95-
path: String,
96-
samplingRatio: Double,
97-
userSpecifiedSchema: Option[StructType])(
98-
@transient val sqlContext: SQLContext)
99-
extends TableScan with InsertableRelation {
100-
// TODO: Support partitioned JSON relation.
101-
val filePath = new Path(path,"*").toUri.toString
102-
private def baseRDD = sqlContext.sparkContext.textFile(filePath)
103-
104-
override val schema = userSpecifiedSchema.getOrElse(
105-
JsonRDD.nullTypeToStringType(
106-
JsonRDD.inferSchema(
107-
baseRDD,
108-
samplingRatio,
109-
sqlContext.conf.columnNameOfCorruptRecord)))
110-
111-
override def buildScan() =
112-
JsonRDD.jsonStringToRow(baseRDD, schema, sqlContext.conf.columnNameOfCorruptRecord)
113-
114-
private def isTemporaryFile(file: Path): Boolean = {
115-
file.getName == "_temporary"
116-
}
117-
118-
override def insert(data: DataFrame, overwrite: Boolean) = {
119-
120-
val filesystemPath = new Path(path)
121-
val fs = filesystemPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
122-
123-
// If the path exists, it must be a directory.
124-
// Otherwise we create a directory with the path name.
125-
if (fs.exists(filesystemPath) && !fs.getFileStatus(filesystemPath).isDirectory) {
126-
sys.error("a CREATE [TEMPORARY] TABLE AS SELECT statement need the path must be directory")
127-
}
128-
129-
if (overwrite) {
130-
val temporaryPath = new Path(path, "_temporary")
131-
val dataPath = new Path(path, "data")
132-
// Write the data.
133-
data.toJSON.saveAsTextFile(temporaryPath.toUri.toString)
134-
val pathsToDelete = fs.listStatus(filesystemPath).filter(
135-
f => !isTemporaryFile(f.getPath)).map(_.getPath)
136-
137-
try {
138-
pathsToDelete.foreach(fs.delete(_,true))
139-
} catch {
140-
case e: IOException =>
141-
throw new IOException(
142-
s"Unable to delete original data in directory ${filesystemPath.toString} when"
143-
+ s" run INSERT OVERWRITE a JSON table:\n${e.toString}")
144-
}
145-
fs.rename(temporaryPath,dataPath)
146-
// Right now, we assume that the schema is not changed. We will not update the schema.
147-
// schema = data.schema
148-
} else {
149-
// TODO: Support INSERT INTO
150-
sys.error("JSON table only support INSERT OVERWRITE for now.")
151-
}
152-
}
153-
154-
override def hashCode(): Int = 41 * (41 + path.hashCode) + schema.hashCode()
155-
156-
override def equals(other: Any): Boolean = other match {
157-
case that: JSONRelation =>
158-
(this.path == that.path) && (this.schema == that.schema)
159-
case _ => false
160-
}
161-
}

0 commit comments

Comments
 (0)