|
17 | 17 |
|
18 | 18 | package org.apache.spark.sql.execution.datasources.parquet |
19 | 19 |
|
20 | | -import org.apache.hadoop.conf.Configuration |
21 | 20 | import org.apache.hadoop.fs.Path |
22 | 21 | import org.apache.hadoop.mapreduce._ |
23 | | -import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl |
24 | | -import org.apache.parquet.hadoop.{ParquetOutputFormat, ParquetRecordWriter} |
25 | | -import org.apache.parquet.hadoop.codec.CodecConfig |
26 | | -import org.apache.parquet.hadoop.util.ContextUtil |
| 22 | +import org.apache.parquet.hadoop.ParquetOutputFormat |
27 | 23 |
|
28 | 24 | import org.apache.spark.sql.Row |
29 | 25 | import org.apache.spark.sql.catalyst.InternalRow |
30 | | -import org.apache.spark.sql.execution.datasources.{OutputWriter, OutputWriterFactory} |
31 | | -import org.apache.spark.sql.internal.SQLConf |
32 | | -import org.apache.spark.sql.types.StructType |
33 | | -import org.apache.spark.util.SerializableConfiguration |
34 | | - |
35 | | - |
36 | | -/** |
37 | | - * A factory for generating OutputWriters for writing parquet files. This implemented is different |
38 | | - * from the [[ParquetOutputWriter]] as this does not use any [[OutputCommitter]]. It simply |
39 | | - * writes the data to the path used to generate the output writer. Callers of this factory |
40 | | - * has to ensure which files are to be considered as committed. |
41 | | - */ |
42 | | -private[parquet] class ParquetOutputWriterFactory( |
43 | | - sqlConf: SQLConf, |
44 | | - dataSchema: StructType, |
45 | | - hadoopConf: Configuration, |
46 | | - options: Map[String, String]) |
47 | | - extends OutputWriterFactory { |
48 | | - |
49 | | - private val serializableConf: SerializableConfiguration = { |
50 | | - val job = Job.getInstance(hadoopConf) |
51 | | - val conf = ContextUtil.getConfiguration(job) |
52 | | - val parquetOptions = new ParquetOptions(options, sqlConf) |
53 | | - |
54 | | - // We're not really using `ParquetOutputFormat[Row]` for writing data here, because we override |
55 | | - // it in `ParquetOutputWriter` to support appending and dynamic partitioning. The reason why |
56 | | - // we set it here is to setup the output committer class to `ParquetOutputCommitter`, which is |
57 | | - // bundled with `ParquetOutputFormat[Row]`. |
58 | | - job.setOutputFormatClass(classOf[ParquetOutputFormat[Row]]) |
59 | | - |
60 | | - ParquetOutputFormat.setWriteSupportClass(job, classOf[ParquetWriteSupport]) |
61 | | - |
62 | | - // We want to clear this temporary metadata from saving into Parquet file. |
63 | | - // This metadata is only useful for detecting optional columns when pushing down filters. |
64 | | - val dataSchemaToWrite = StructType.removeMetadata( |
65 | | - StructType.metadataKeyForOptionalField, |
66 | | - dataSchema).asInstanceOf[StructType] |
67 | | - ParquetWriteSupport.setSchema(dataSchemaToWrite, conf) |
68 | | - |
69 | | - // Sets flags for `CatalystSchemaConverter` (which converts Catalyst schema to Parquet schema) |
70 | | - // and `CatalystWriteSupport` (writing actual rows to Parquet files). |
71 | | - conf.set( |
72 | | - SQLConf.PARQUET_BINARY_AS_STRING.key, |
73 | | - sqlConf.isParquetBinaryAsString.toString) |
74 | | - |
75 | | - conf.set( |
76 | | - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, |
77 | | - sqlConf.isParquetINT96AsTimestamp.toString) |
78 | | - |
79 | | - conf.set( |
80 | | - SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key, |
81 | | - sqlConf.writeLegacyParquetFormat.toString) |
82 | | - |
83 | | - // Sets compression scheme |
84 | | - conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodecClassName) |
85 | | - new SerializableConfiguration(conf) |
86 | | - } |
87 | | - |
88 | | - /** |
89 | | - * Returns a [[OutputWriter]] that writes data to the give path without using |
90 | | - * [[OutputCommitter]]. |
91 | | - */ |
92 | | - override def newWriter(path: String): OutputWriter = new OutputWriter { |
93 | | - |
94 | | - // Create TaskAttemptContext that is used to pass on Configuration to the ParquetRecordWriter |
95 | | - private val hadoopTaskAttemptId = new TaskAttemptID(new TaskID(new JobID, TaskType.MAP, 0), 0) |
96 | | - private val hadoopAttemptContext = new TaskAttemptContextImpl( |
97 | | - serializableConf.value, hadoopTaskAttemptId) |
98 | | - |
99 | | - // Instance of ParquetRecordWriter that does not use OutputCommitter |
100 | | - private val recordWriter = createNoCommitterRecordWriter(path, hadoopAttemptContext) |
101 | | - |
102 | | - override def write(row: Row): Unit = { |
103 | | - throw new UnsupportedOperationException("call writeInternal") |
104 | | - } |
105 | | - |
106 | | - protected[sql] override def writeInternal(row: InternalRow): Unit = { |
107 | | - recordWriter.write(null, row) |
108 | | - } |
109 | | - |
110 | | - override def close(): Unit = recordWriter.close(hadoopAttemptContext) |
111 | | - } |
112 | | - |
113 | | - /** Create a [[ParquetRecordWriter]] that writes the given path without using OutputCommitter */ |
114 | | - private def createNoCommitterRecordWriter( |
115 | | - path: String, |
116 | | - hadoopAttemptContext: TaskAttemptContext): RecordWriter[Void, InternalRow] = { |
117 | | - // Custom ParquetOutputFormat that disable use of committer and writes to the given path |
118 | | - val outputFormat = new ParquetOutputFormat[InternalRow]() { |
119 | | - override def getOutputCommitter(c: TaskAttemptContext): OutputCommitter = { null } |
120 | | - override def getDefaultWorkFile(c: TaskAttemptContext, ext: String): Path = { new Path(path) } |
121 | | - } |
122 | | - outputFormat.getRecordWriter(hadoopAttemptContext) |
123 | | - } |
124 | | - |
125 | | - /** Disable the use of the older API. */ |
126 | | - override def newInstance( |
127 | | - path: String, |
128 | | - dataSchema: StructType, |
129 | | - context: TaskAttemptContext): OutputWriter = { |
130 | | - throw new UnsupportedOperationException("this version of newInstance not supported for " + |
131 | | - "ParquetOutputWriterFactory") |
132 | | - } |
133 | | - |
134 | | - override def getFileExtension(context: TaskAttemptContext): String = { |
135 | | - CodecConfig.from(context).getCodec.getExtension + ".parquet" |
136 | | - } |
137 | | -} |
138 | | - |
| 26 | +import org.apache.spark.sql.execution.datasources.OutputWriter |
139 | 27 |
|
140 | 28 | // NOTE: This class is instantiated and used on executor side only, no need to be serializable. |
141 | 29 | private[parquet] class ParquetOutputWriter(path: String, context: TaskAttemptContext) |
|
0 commit comments