Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b2e92b4
Test for reading json in UTF-16 with BOM
MaxGekk Feb 11, 2018
cb2f27b
Use user's charset or autodetect it if the charset is not specified
MaxGekk Feb 11, 2018
0d45fd3
Added a type and a comment for charset
MaxGekk Feb 13, 2018
1fb9b32
Replacing the monadic chaining by matching because it is more readable
MaxGekk Feb 13, 2018
c3b04ee
Keeping the old method for backward compatibility
MaxGekk Feb 13, 2018
93d3879
testFile is moved into the test to make more local because it is used…
MaxGekk Feb 13, 2018
15798a1
Adding the charset as third parameter to the text method
MaxGekk Feb 13, 2018
cc05ce9
Removing whitespaces at the end of the line
MaxGekk Feb 13, 2018
74f2026
Fix the comment in javadoc style
MaxGekk Feb 13, 2018
4856b8e
Simplifying of the UTF-16 test
MaxGekk Feb 13, 2018
084f41f
A hint to the exception how to set the charset explicitly
MaxGekk Feb 15, 2018
31cd793
Fix for scala style checks
MaxGekk Feb 15, 2018
6eacd18
Run tests again
MaxGekk Feb 15, 2018
3b4a509
Improving of the exception message
MaxGekk Feb 15, 2018
cd1124e
Appended the original message to the exception
MaxGekk Feb 15, 2018
ebf5390
Multi-line reading of json file in utf-32
MaxGekk Feb 17, 2018
c5b6a35
Autodetect charset of jsons in the multiline mode
MaxGekk Feb 17, 2018
ef5e6c6
Test for reading a json in UTF-16LE in the multiline mode by using us…
MaxGekk Feb 17, 2018
f9b6ad1
Fix test: rename the test file - utf32be -> utf32BE
MaxGekk Feb 18, 2018
3b7714c
Fix code style
MaxGekk Feb 18, 2018
edb9167
Appending the create verb to the method for readability
MaxGekk Feb 18, 2018
5ba2881
Making the createParser as a separate private method
MaxGekk Feb 18, 2018
1509e10
Fix code style
MaxGekk Feb 18, 2018
e3184b3
Checks the charset option is supported
MaxGekk Feb 19, 2018
87d259c
Support charset as a parameter of the json method
MaxGekk Feb 19, 2018
76c1d08
Test for charset different from utf-8
MaxGekk Feb 19, 2018
88395b5
Description of the charset option of the json method
MaxGekk Feb 20, 2018
f2f8ae7
Minor changes in comments: added . at the end of a sentence
MaxGekk Feb 21, 2018
b451a03
Added a test for wrong charset name
MaxGekk Feb 21, 2018
c13c159
Testing that charset in any case is acceptable
MaxGekk Feb 21, 2018
1cb3ac0
Test: user specified wrong (but supported) charset
MaxGekk Feb 21, 2018
108e8e7
Set charset as an option
MaxGekk Feb 25, 2018
0d20cc6
Test: saving to json in UTF-32BE
MaxGekk Feb 23, 2018
54baf9f
Taking user's charset for saved json
MaxGekk Feb 23, 2018
1d50d94
Test: output charset is UTF-8 by default
MaxGekk Feb 23, 2018
bb53798
Changing the readJsonFiles method for readability
MaxGekk Mar 4, 2018
961b482
The test checks that json written by Spark can be read back
MaxGekk Mar 4, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowComments=None, allowUnquotedFieldNames=None, allowSingleQuotes=None,
allowNumericLeadingZero=None, allowBackslashEscapingAnyCharacter=None,
mode=None, columnNameOfCorruptRecord=None, dateFormat=None, timestampFormat=None,
multiLine=None, allowUnquotedControlChars=None):
multiLine=None, allowUnquotedControlChars=None, charset=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we ues encoding to be consistent with CSV? charset had an alias encoding to look after Pandas and R.

"""
Loads JSON files and returns the results as a :class:`DataFrame`.

Expand Down Expand Up @@ -237,6 +237,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param allowUnquotedControlChars: allows JSON Strings to contain unquoted control
characters (ASCII characters with value less than 32,
including tab and line feed characters) or not.
:param charset: standard charset name, for example UTF-8, UTF-16 and UTF-32. If None is
set, the charset of input json will be detected automatically.
Copy link
Member

@HyukjinKwon HyukjinKwon Mar 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have another test case with an encoding jackson doesn't automatically detect too?


>>> df1 = spark.read.json('python/test_support/sql/people.json')
>>> df1.dtypes
Expand All @@ -254,7 +256,7 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
allowBackslashEscapingAnyCharacter=allowBackslashEscapingAnyCharacter,
mode=mode, columnNameOfCorruptRecord=columnNameOfCorruptRecord, dateFormat=dateFormat,
timestampFormat=timestampFormat, multiLine=multiLine,
allowUnquotedControlChars=allowUnquotedControlChars)
allowUnquotedControlChars=allowUnquotedControlChars, charset=charset)
if isinstance(path, basestring):
path = [path]
if type(path) == list:
Expand Down
7 changes: 7 additions & 0 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,13 @@ def test_multiLine_json(self):
multiLine=True)
self.assertEqual(people1.collect(), people_array.collect())

def test_charset_json(self):
people_array = self.spark.read\
.json("python/test_support/sql/people_array_utf16le.json",
multiLine=True, charset="UTF-16LE")
expected = [Row(age=30, name=u'Andy'), Row(age=19, name=u'Justin')]
self.assertEqual(people_array.collect(), expected)

def test_multiline_csv(self):
ages_newlines = self.spark.read.csv(
"python/test_support/sql/ages_newlines.csv", multiLine=True)
Expand Down
Binary file added python/test_support/sql/people_array_utf16le.json
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,25 @@ private[sql] object CreateJacksonParser extends Serializable {
jsonFactory.createParser(new InputStreamReader(bain, "UTF-8"))
}

def text(jsonFactory: JsonFactory, record: Text): JsonParser = {
jsonFactory.createParser(record.getBytes, 0, record.getLength)
def text(jsonFactory: JsonFactory, record: Text, charset: Option[String] = None): JsonParser = {
charset match {
case Some(cs) =>
val bain = new ByteArrayInputStream(record.getBytes, 0, record.getLength)
jsonFactory.createParser(new InputStreamReader(bain, cs))
case _ =>
jsonFactory.createParser(record.getBytes, 0, record.getLength)
}
}

def inputStream(jsonFactory: JsonFactory, record: InputStream): JsonParser = {
jsonFactory.createParser(record)
def inputStream(
jsonFactory: JsonFactory,
is: InputStream,
charset: Option[String] = None): JsonParser = {
charset match {
case Some(cs) =>
jsonFactory.createParser(new InputStreamReader(is, cs))
case _ =>
jsonFactory.createParser(is)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,12 @@ private[sql] class JSONOptions(

val multiLine = parameters.get("multiLine").map(_.toBoolean).getOrElse(false)

/**
* Standard charset name. For example UTF-8, UTF-16 and UTF-32.
* If charset is not specified (None), it will be detected automatically.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this detect the encoding for newlines too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the encoding of records/lines delimiter? It depends on the mode. In multiline mode, jackson is able to do that. In the case of per-line mode, Hadoop LinerRecordReader could accept delimiters in any charsets but by defaults it splits input by '\r', '\n', and '\r\n' in UTF-8. This will be fixed in separate PRs for the issues: https://issues.apache.org/jira/browse/SPARK-23724 and https://issues.apache.org/jira/browse/SPARK-23725

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we fix that first with text datasource since schema inference in JSON is dependent on Text datasource? You are exposing incomplete option now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

schema inference in JSON is dependent on Text datasource

Could you clarify this, please. It is not completely clear to me what do you mean.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See this 8fb2a02. It uses Text datasource when it loads lines when we infer schema.

If we fix encodings with newline first, it's required to Text datasource first I believe.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Json's schema inference use the text datasource to separate the lines before we go through jackson parser where the charset for newlines should be respected. Shouldn't we better fix text datasource with the hadoop's line reader first?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A fix in hadoop line reader and this PR solve 2 different problem. Any fix in hadoop line reader will not fix the problem of wrong encoding detection. I don't understand why this PR must depend on a fix in line reader. I would say a custom record separator will solve newline problem too (https://issues.apache.org/jira/browse/SPARK-23724).

Shouldn't we better fix text datasource with the hadoop's line reader first?

Could you tell me how this PR blocks solving the problem in Hadoop's LineReader?

Copy link
Member

@HyukjinKwon HyukjinKwon Mar 18, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you tell me how this PR blocks solving the problem in Hadoop's LineReader?

Because the exposed charset option is incomplete here because the encodings are not respected in newlines.
Also, I want to see how we can solve that problem in SPARK-23724 first too. I am actually not quite worried of the whole changes proposed here for now.

Why don't we just fix that problem first if you plan to fix both eventually anyway?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to introduce changes step-by-step to eliminate creating of a "patch bomb". Regarding to SPARK-23724, I am going to propose this PR: MaxGekk#1 /cc @hvanhovell

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, step-by-step. I am not suggesting to fix all here. I meant SPARK-23724 should be fixed first with text data source.

*/
val charset: Option[String] = parameters.get("charset")

/** Sets config options on a Jackson [[JsonFactory]]. */
def setJacksonOptions(factory: JsonFactory): Unit = {
factory.configure(JsonParser.Feature.ALLOW_COMMENTS, allowComments)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.json

import java.io.ByteArrayOutputStream
import java.io.{ByteArrayOutputStream, CharConversionException}

import scala.collection.mutable.ArrayBuffer
import scala.util.Try
Expand Down Expand Up @@ -361,6 +361,15 @@ class JacksonParser(
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
case e: CharConversionException if options.charset.isEmpty =>
val msg =
"""Failed to parse a character. Charset was detected automatically.
|You might want to set it explicitly via the charset option like:
| .option("charset", "UTF-8")
|Example of supported charsets:
| UTF-8, UTF-16, UTF-16BE, UTF-16LE, UTF-32, UTF-32BE, UTF-32LE
|""".stripMargin + e.getMessage
throw new CharConversionException(msg)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* `java.text.SimpleDateFormat`. This applies to timestamp type.</li>
* <li>`multiLine` (default `false`): parse one record, which may span multiple lines,
* per file</li>
* <li>`charset` (by default it is not set): allows to forcibly set one of standard basic
* or extended charsets for input jsons. For example UTF-8, UTF-16BE, UTF-32. If the charset
* is not specified (by default), the charset is detected automatically.</li>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we document it in write side too?

* </ul>
*
* @since 2.0.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,10 @@ object TextInputJsonDataSource extends JsonDataSource {
schema: StructType): Iterator[InternalRow] = {
val linesReader = new HadoopFileLinesReader(file, conf)
Option(TaskContext.get()).foreach(_.addTaskCompletionListener(_ => linesReader.close()))
val charset = parser.options.charset

val safeParser = new FailureSafeParser[Text](
input => parser.parse(input, CreateJacksonParser.text, textToUTF8String),
input => parser.parse[Text](input, CreateJacksonParser.text(_, _, charset), textToUTF8String),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)
Expand All @@ -146,7 +148,12 @@ object MultiLineJsonDataSource extends JsonDataSource {
parsedOptions: JSONOptions): StructType = {
val json: RDD[PortableDataStream] = createBaseRdd(sparkSession, inputPaths)
val sampled: RDD[PortableDataStream] = JsonUtils.sample(json, parsedOptions)
JsonInferSchema.infer(sampled, parsedOptions, createParser)

JsonInferSchema.infer[PortableDataStream](
sampled,
parsedOptions,
createParser(_, _, parsedOptions.charset)
)
}

private def createBaseRdd(
Expand All @@ -168,33 +175,43 @@ object MultiLineJsonDataSource extends JsonDataSource {
.values
}

private def createParser(jsonFactory: JsonFactory, record: PortableDataStream): JsonParser = {
private def createParser(
jsonFactory: JsonFactory,
record: PortableDataStream,
charset: Option[String] = None): JsonParser = {
val path = new Path(record.getPath())
CreateJacksonParser.inputStream(
jsonFactory,
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path))
CodecStreams.createInputStreamWithCloseResource(record.getConfiguration, path),
charset
)
}

override def readFile(
conf: Configuration,
file: PartitionedFile,
parser: JacksonParser,
schema: StructType): Iterator[InternalRow] = {
def createInputStream() = {
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
}
def partitionedFileString(ignored: Any): UTF8String = {
Utils.tryWithResource {
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath)))
} { inputStream =>
UTF8String.fromBytes(ByteStreams.toByteArray(inputStream))
Utils.tryWithResource(createInputStream()) { is =>
UTF8String.fromBytes(ByteStreams.toByteArray(is))
}
}
val charset = parser.options.charset

val safeParser = new FailureSafeParser[InputStream](
input => parser.parse(input, CreateJacksonParser.inputStream, partitionedFileString),
input => parser.parse[InputStream](
input,
CreateJacksonParser.inputStream(_, _, charset),
partitionedFileString
),
parser.options.parseMode,
schema,
parser.options.columnNameOfCorruptRecord)

safeParser.parse(
CodecStreams.createInputStreamWithCloseResource(conf, new Path(new URI(file.filePath))))
safeParser.parse(createInputStream())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.sql.execution.datasources.json

import java.nio.charset.{Charset, StandardCharsets}

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.mapreduce.{Job, TaskAttemptContext}
Expand Down Expand Up @@ -151,7 +153,16 @@ private[json] class JsonOutputWriter(
context: TaskAttemptContext)
extends OutputWriter with Logging {

private val writer = CodecStreams.createOutputStreamWriter(context, new Path(path))
private val charset = options.charset match {
case Some(charsetName) => Charset.forName(charsetName)
case _ => StandardCharsets.UTF_8
}

private val writer = CodecStreams.createOutputStreamWriter(
context,
new Path(path),
charset
)

// create the Generator without separator inserted between 2 records
private[this] val gen = new JacksonGenerator(dataSchema, writer, options)
Expand Down
Binary file added sql/core/src/test/resources/json-tests/utf16LE.json
Binary file not shown.
Binary file not shown.
Binary file not shown.
Loading