Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 16 additions & 14 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
schema. If a schema does not have the field, it drops corrupt records during \
parsing. When inferring a schema, it implicitly adds a \
``columnNameOfCorruptRecord`` field in an output schema.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should say it implicitly adds ... if a corrupted record is found while we are here? I think it only adds `columnNameOfCorruptRecord` when it meets a corrupted record during schema inference.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When users set a string type field named columnNameOfCorruptRecord in an user-defined schema, even no corrupted record, I think the field is still added. Or I misread this sentence?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah I thought this:

When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` field in an output schema.

describes schema inference because it adds columnNameOfCorruptRecord column if malformed record was found during schema inference. I mean ..:

scala> spark.read.json(Seq("""{"a": 1}""", """{"a":""").toDS).printSchema()
root
 |-- _corrupt_record: string (nullable = true)
 |-- a: long (nullable = true)


scala> spark.read.json(Seq("""{"a": 1}""").toDS).printSchema()
root
 |-- a: long (nullable = true)

but yes I think I misread it. Here we describe things mainly about malformed records already.

field in an output schema.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down Expand Up @@ -393,13 +393,15 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an \
user-defined schema. If a schema does not have the field, it drops corrupt \
records during parsing. When a length of parsed CSV tokens is shorter than \
an expected length of a schema, it sets `null` for extra fields.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't say and sets other fields to null, as it's not the case for CSV

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is talking about a corrupted record, not a record with less/more tokens. If CSV parser fails to parse a record, all other fields are set to null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, I think we need to explain that, for CSV a record with less/more tokens is not a malformed record.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Added.

field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
A record with less/more tokens than schema is not a corrupted record to CSV. \
When it meets a record having fewer tokens than the length of the schema, \
sets ``null`` to extra fields. When the record has more tokens than the \
length of the schema, it drops extra tokens.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down
30 changes: 16 additions & 14 deletions python/pyspark/sql/streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,13 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None,
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an user-defined \
schema. If a schema does not have the field, it drops corrupt records during \
parsing. When inferring a schema, it implicitly adds a \
``columnNameOfCorruptRecord`` field in an output schema.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \
field in an output schema.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down Expand Up @@ -621,13 +621,15 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non
:param mode: allows a mode for dealing with corrupt records during parsing. If None is
set, it uses the default value, ``PERMISSIVE``.

* ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \
record, and puts the malformed string into a field configured by \
``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \
a string type field named ``columnNameOfCorruptRecord`` in an \
user-defined schema. If a schema does not have the field, it drops corrupt \
records during parsing. When a length of parsed CSV tokens is shorter than \
an expected length of a schema, it sets `null` for extra fields.
* ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \
into a field configured by ``columnNameOfCorruptRecord``, and sets other \
fields to ``null``. To keep corrupt records, an user can set a string type \
field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \
schema does not have the field, it drops corrupt records during parsing. \
A record with less/more tokens than schema is not a corrupted record to CSV. \
When it meets a record having fewer tokens than the length of the schema, \
sets ``null`` to extra fields. When the record has more tokens than the \
length of the schema, it drops extra tokens.
* ``DROPMALFORMED`` : ignores the whole corrupted records.
* ``FAILFAST`` : throws an exception when it meets corrupted records.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,9 @@ class JacksonParser(
}
} catch {
case e @ (_: RuntimeException | _: JsonProcessingException) =>
// JSON parser currently doesn't support partial results for corrupted records.
// For such records, all fields other than the field configured by
// `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => recordLiteral(record), () => None, e)
}
}
Expand Down
22 changes: 12 additions & 10 deletions sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
* field in an output schema.</li>
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
* keep corrupt records, an user can set a string type field named
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
* field, it drops corrupt records during parsing. When inferring a schema, it implicitly
* adds a `columnNameOfCorruptRecord` field in an output schema.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down Expand Up @@ -550,12 +550,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When a length of parsed CSV tokens is shorter than an expected length
* of a schema, it sets `null` for extra fields.</li>
* during parsing. A record with less/more tokens than schema is not a corrupted record to
* CSV. When it meets a record having fewer tokens than the length of the schema, sets
* `null` to extra fields. When the record has more tokens than the length of the schema,
* it drops extra tokens.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,8 @@ class UnivocityParser(
case _: BadRecordException => None
}
}
// For records with less or more tokens than the schema, tries to return partial results
// if possible.
throw BadRecordException(
() => getCurrentInput,
() => getPartialResult(),
Expand All @@ -218,6 +220,9 @@ class UnivocityParser(
row
} catch {
case NonFatal(e) =>
// For corrupted records with the number of tokens same as the schema,
// CSV reader doesn't support partial results. All fields other than the field
// configured by `columnNameOfCorruptRecord` are set to `null`.
throw BadRecordException(() => getCurrentInput, () => None, e)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,12 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord`
* field in an output schema.</li>
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To
* keep corrupt records, an user can set a string type field named
* `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the
* field, it drops corrupt records during parsing. When inferring a schema, it implicitly
* adds a `columnNameOfCorruptRecord` field in an output schema.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down Expand Up @@ -316,12 +316,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo
* <li>`mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records
* during parsing. It supports the following case-insensitive modes.
* <ul>
* <li>`PERMISSIVE` : sets other fields to `null` when it meets a corrupted record, and puts
* the malformed string into a field configured by `columnNameOfCorruptRecord`. To keep
* <li>`PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a
* field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep
* corrupt records, an user can set a string type field named `columnNameOfCorruptRecord`
* in an user-defined schema. If a schema does not have the field, it drops corrupt records
* during parsing. When a length of parsed CSV tokens is shorter than an expected length
* of a schema, it sets `null` for extra fields.</li>
* during parsing. A record with less/more tokens than schema is not a corrupted record to
* CSV. When it meets a record having fewer tokens than the length of the schema, sets
* `null` to extra fields. When the record has more tokens than the length of the schema,
* it drops extra tokens.</li>
* <li>`DROPMALFORMED` : ignores the whole corrupted records.</li>
* <li>`FAILFAST` : throws an exception when it meets corrupted records.</li>
* </ul>
Expand Down