Skip to content

Conversation

@NathanHowell
Copy link

@NathanHowell NathanHowell commented Dec 23, 2016

What changes were proposed in this pull request?

If a new option wholeFile is set to true the JSON reader will parse each file (instead of a single line) as a value. This is done with Jackson streaming and it should be capable of parsing very large documents, assuming the row will fit in memory.

Because the file is not buffered in memory the corrupt record handling is also slightly different when wholeFile is enabled: the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure. It would be easy to extend this to add the parser location (line, column and byte offsets) to the output if desired.

These changes have allowed types other than String to be parsed. Support for UTF8String and Text have been added (alongside String and InputFormat) and no longer require a conversion to String just for parsing.

I've also included a few other changes that generate slightly better bytecode and (imo) make it more obvious when and where boxing is occurring in the parser. These are included as separate commits, let me know if they should be flattened into this PR or moved to a new one.

How was this patch tested?

New and existing unit tests. No performance or load tests have been run.

@NathanHowell
Copy link
Author

Hello recent JacksonGenerator.scala commiters, please take a look.

cc/ @rxin @hvanhovell @clockfly @HyukjinKwon @cloud-fan

@SparkQA
Copy link

SparkQA commented Dec 23, 2016

Test build #70531 has finished for PR 16386 at commit 7ad5d5b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to add this to the end; otherwise it breaks compatibility for positional arguments.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Dec 23, 2016

the corrupt column will contain the filename instead of the literal JSON if there is a parsing failure

I am worried of changing the behaviour. I understand why it had to be done like this here as described in the description but we have input_file_name functions for these. Also, I would not expect, at least, there are file names in _corrupt_record.

We need to document this around spark.sql.columnNameOfCorruptRecord in SQLConf and columnNameOfCorruptRecord in read/writer in Python and Scala if this is acceptable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this changes existing behaviour (not allowing empty schema).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it was a regression that caused a test failure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does >: Null mean?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It states that R must be a nullable type. This enables null: R to compile and is preferable to the runtime cast null.asInstanceOf[R] because it is verified at compile time.

Copy link
Member

@HyukjinKwon HyukjinKwon Feb 9, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I said +1 because it explicitly expresses it should be nullable and I assumed (because I did not check the byte codes by myself and I might be wrong) that it gives a hint to compiler because Null is nullable (I remember I googled and played with some references for whole several days before when I was investigating another null-related issue by myself).

I am not confident enough to propose such changes in my PRs but I feel enough to say +1 at least as an expression to support this idea.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I disagree with passing whole SparkSession because apparently we only need SQLConf or the value of spark.sql.columnNameOfCorruptRecord.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just removed the method entirely since all it did was fetch the value of columnNameOfCorruptRecord.

@srowen
Copy link
Member

srowen commented Dec 23, 2016

Why do we need this at all? just use wholeTextFiles and parse them as JSON.

@NathanHowell
Copy link
Author

@srowen It is functionally the same as what you're suggesting. The question is how (or if) it should it be first class in the DataFrameReader api. If we agree that it should be exposed, either via a new FileFormat or an option to JsonFileFormat, some abstraction is necessary to support reading from different RDD classes.

This PR just pushes that boundary a little further and let's the inference and parser code work over more types, not just String. This may make parsing more efficient in the line oriented codepath by avoiding a conversion from Text and UTF8String (in JsonToStruct) to String, and also lets us parse an InputStream without requiring all of the data to be in memory. For small files it's not likely to have a benefit (if the file is smaller than 4k it will be read entirely anyways) but as the file size increases this reduces the amount of memory required for parsing, is friendlier (in theory) on the GC and let's us consume files larger than 2GB.

@SparkQA
Copy link

SparkQA commented Dec 27, 2016

Test build #70644 has finished for PR 16386 at commit aa5a6db.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NathanHowell
Copy link
Author

@HyukjinKwon I agree that overloading the corrupt record column is undesirable and F.input_file_name is a better way to fetch the filename. It would be nice to extend this concept further and provide new functions (like F.json_exception) to retrieve exceptions and their locations, and this would work for the base case (parsing a string) as well as wholeFile. Plumbing this type of change through appears to require thread locale storage (unfortunately) but otherwise doesn't look too bad.

The question then is what to put in the corrupt record column, if one is defined, when in wholeFile mode. To retain consistency with the string paths we should really put the entire file in the column. This is problematic for large files (>2GB) since Spark SQL doesn't have blob support... so the allocations will fail (along with the task) and there is no way for the end user to work around this limitation. Functions like substr are applied to byte arrays and not file streams. Perhaps it's good enough to issue a warning (along the lines of "don't define a corrupt record column in wholeFile mode" and hope for the best?

@NathanHowell
Copy link
Author

The tests failed for an unrelated reason, looks to be running out of heap space in SBT somewhere.

@HyukjinKwon
Copy link
Member

Only regarding the comment, #16386 (comment), I have a similar (rather combined) idea that we provide another option such as corrupt file name optionally (meaning maybe the column appears only when user explicitly set for backwards compatibility), don't add a column by columnNameOfCorruptRecord with a proper documentation in wholeFile mode and issue a warning message if columnNameOfCorruptRecord is set by user in wholeFile mode. This is a bit complicated idea that might make users confused though. I am not sure if it is the best idea.

@NathanHowell
Copy link
Author

NathanHowell commented Dec 29, 2016

@HyukjinKwon I just pushed a change that makes the corrupt record handling consistent: if a corrupt record column is defined it will always get the json text for failed records. If wholeFile is enabled a warning is emitted.

I think more discussion is needed to figure out the best way to handle corrupt records and exceptions, perhaps it can be shelved for now and we can pick it up later under another issue?

@SparkQA
Copy link

SparkQA commented Dec 30, 2016

Test build #70730 has finished for PR 16386 at commit 9dc084d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NathanHowell
Copy link
Author

Jenkins, retest this please

@SparkQA
Copy link

SparkQA commented Jan 10, 2017

Test build #71147 has finished for PR 16386 at commit 9dc084d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@NathanHowell
Copy link
Author

Any other comments?

@sameeragarwal
Copy link
Member

sameeragarwal commented Feb 1, 2017

cc @gatorsmile can you please take a look too?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no since tag in other methods of this class.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just make lazy val conf not private?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a public class so I thought adding a since tag would benefit the documentation. If it's not desired I can certainly remove it.

As for making the lazy val public vs private: I'm following the style used already in the class. There are public get methods for each private field. I'm not partial to either approach but prefer to keep it consistent.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SGTM, can you take a look at other public methods in this class and add since tag for them? or it looks weird that only one method has since tag...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, pushed in f71a465cf07fb9c043b2ccd86fa57e8e8ea9dc00

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previously the JSONOptions instance was always passed around with a columnNameOfCorruptRecord value. This just makes it a field in JSONOptions instead to put all options in one place. Since it's a required option it made more sense to use a field instead making an entry in the CaseInsensitiveMap.

@cloud-fan
Copy link
Contributor

can we focus on supporting multiline json in this PR? We can leave the improvements in new PRs, or this PR is kind of hard to review.

@gatorsmile
Copy link
Member

Sorry, I missed the ping. Will review it tonight.

@cloud-fan
Copy link
Contributor

Hi @NathanHowell , do you have time to work on it? thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will it be more consistent if we return ByteBuffer.wrap(getBytes) here?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will allocate an extra object but would simplify the calling code... since it would be a short lived allocation it's probably fine to do this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this makes the code more readable...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed to satisfy the type checker. The other approach is to explicitly specify the type in two locations: Try[java.lang.Long](...).getOrElse[java.lang.Long](...). I found explicitly boxing to be more readable than the alternative.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why call it createBaseRddConf instead of createBaseRdd?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Habit from working with languages that don't support overloading, I'll change this

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like you need withTempPath

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we write json string literal to text file? it's hard to understand what's going on here...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

@SparkQA
Copy link

SparkQA commented Feb 16, 2017

Test build #72975 has finished for PR 16386 at commit 7296f7e.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parameters docs come with the same order of the parameter list, let's move the wholeFile doc to the end

:param path: string represents path to the JSON dataset,
or RDD of Strings storing JSON objects.
:param schema: an optional :class:`pyspark.sql.types.StructType` for the input schema.
:param wholeFile: parse one record, which may span multiple lines, per file. If None is
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

*
* You can set the following JSON-specific options to deal with non-standard JSON files:
* <ul>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please move it to the end

val columnNameOfCorruptRecord =
parsedOptions.columnNameOfCorruptRecord
.getOrElse(sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val parsedOptions = new JSONOptions(extraOptions.toMap,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the style should be

new XXX(
  para1,
  para2,
  para3)

* <ul>
* <li>`maxFilesPerTrigger` (default: no max limit): sets the maximum number of new files to be
* considered in every trigger.</li>
* <li>`wholeFile` (default `false`): parse one record, which may span multiple lines,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

@cloud-fan
Copy link
Contributor

LGTM if the test can pass. It will be good if you can also address #16386 (comment)


assert(jsonCopy.count === jsonDF.count)
val jsonCopySome = jsonCopy.selectExpr("string", "long", "boolean")
val jsonDFSome = jsonDF.selectExpr("string", "long", "boolean")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it only covers three columns.

root
 |-- bigInteger: decimal(20,0) (nullable = true)
 |-- boolean: boolean (nullable = true)
 |-- double: double (nullable = true)
 |-- integer: long (nullable = true)
 |-- long: long (nullable = true)
 |-- null: string (nullable = true)
 |-- string: string (nullable = true)

root
 |-- bigInteger: decimal(20,0) (nullable = true)
 |-- boolean: boolean (nullable = true)
 |-- double: double (nullable = true)
 |-- integer: long (nullable = true)
 |-- long: long (nullable = true)
 |-- string: string (nullable = true)

@NathanHowell
Copy link
Author

@cloud-fan When implementing tests for the other modes I've uncovered an existing bug in schema inference in DROPMALFORMED mode: https://issues.apache.org/jira/browse/SPARK-19641. Since it is not introduced in this set of patches I will open a new pull request once this is one merged. You can inspect the fix here: NathanHowell@e233fd0

@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73029 has finished for PR 16386 at commit e323317.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73030 has finished for PR 16386 at commit 58118f2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 17, 2017

Test build #73032 has finished for PR 16386 at commit b801ab0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left some more minor comments, please address them in your next PR

def getPath(): String = path

@Since("2.2.0")
def getConfiguration: Configuration = conf
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we should rename it to getConf, getConfiguration is too verbose.

logWarning(
s"""Enabling wholeFile mode and defining columnNameOfCorruptRecord may result
|in very large allocations or OutOfMemoryExceptions being raised.
|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary line

def parse[T](
record: T,
createParser: (JsonFactory, T) => JsonParser,
recordLiteral: T => UTF8String): Seq[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: recordToString?

val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.write
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we call .coalesce(1) to make sure we only write to a singe file?

val path = dir.getCanonicalPath
primitiveFieldAndType
.toDF("value")
.write
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here

sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
JsonDataSource(parsedOptions).infer(
sparkSession, files, parsedOptions)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can merge it into the previous line

classOf[PortableDataStream],
conf,
sparkSession.sparkContext.defaultMinPartitions)
.setName(s"JsonFile: $name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

new XXX(
  ...,
  ...).setName....

classOf[TextInputFormat],
classOf[LongWritable],
classOf[Text])
.setName(s"JsonLines: $name")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

newAPIHadoopRDD(
  ...,
  ...).setName....

F.count($"dummy").as("valid"),
F.count($"_corrupt_record").as("corrupt"),
F.count("*").as("count"))
checkAnswer(counts, Row(1, 4, 6))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why count(*) is 6?

test("SPARK-18352: Handle multi-line corrupt documents (PERMISSIVE)") {
withTempPath { dir =>
val path = dir.getCanonicalPath
val corruptRecordCount = additionalCorruptRecords.count().toInt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name is misleading, we do have a good record in this dataset, isn't it?

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@asfgit asfgit closed this in 21fde57 Feb 17, 2017
@NathanHowell NathanHowell deleted the SPARK-18352 branch February 17, 2017 05:23
@gatorsmile
Copy link
Member

gatorsmile commented Jun 5, 2017

@NathanHowell It sounds like we also can provide multi-line support for JSON too. For example, in a single JSON file

{"a": 1,
"b": 1.1}
{"a": 2, "b": 1.1}
{"a": 3, "b": 1.1}

When using the wholeFile mode, we only parse the first Json record {"a": 1, "b": 1.1} but ignore the following records. It sounds like we should also parse them too and rename wholeFile to multiLine?

@NathanHowell
Copy link
Author

NathanHowell commented Jun 7, 2017 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants