From 4ad330b1def558e17dfb693d428e1bd69248e5a3 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 24 Feb 2018 07:15:11 +0000 Subject: [PATCH 1/6] Clarify JSON and CSV parser behavior. --- .../sql/catalyst/json/JacksonParser.scala | 3 +++ .../org/apache/spark/sql/DataFrameReader.scala | 18 +++++++++++------- .../datasources/csv/UnivocityParser.scala | 5 +++++ 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index bd144c9575c72..7f6956994f31f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -357,6 +357,9 @@ class JacksonParser( } } catch { case e @ (_: RuntimeException | _: JsonProcessingException) => + // JSON parser currently doesn't support partial results for corrupted records. + // For such records, all fields other than the field configured by + // `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => recordLiteral(record), () => None, e) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 4274f120a375a..24db8dc3f30af 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -345,12 +345,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. * @@ -550,12 +552,14 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 7d6d7e7eef926..3d6cc30f2ba83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -203,6 +203,8 @@ class UnivocityParser( case _: BadRecordException => None } } + // For records with less or more tokens than the schema, tries to return partial results + // if possible. throw BadRecordException( () => getCurrentInput, () => getPartialResult(), @@ -218,6 +220,9 @@ class UnivocityParser( row } catch { case NonFatal(e) => + // For corrupted records with the number of tokens same as the schema, + // CSV reader doesn't support partial results. All fields other than the field + // configured by `columnNameOfCorruptRecord` are set to `null`. throw BadRecordException(() => getCurrentInput, () => None, e) } } From 4400cf2eb4d3b1b37c9e299e91db6e4a032e0c3a Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Sat, 24 Feb 2018 11:08:45 +0000 Subject: [PATCH 2/6] Update other related documents. --- python/pyspark/sql/readwriter.py | 33 +++++++++++-------- python/pyspark/sql/streaming.py | 33 +++++++++++-------- .../sql/streaming/DataStreamReader.scala | 18 ++++++---- 3 files changed, 49 insertions(+), 35 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 49af1bcee5ef8..242455d29052f 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -209,13 +209,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ - schema. If a schema does not have the field, it drops corrupt records during \ - parsing. When inferring a schema, it implicitly adds a \ - ``columnNameOfCorruptRecord`` field in an output schema. + * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ + field in an output schema. It doesn't support partial results. Even just one \ + field can't be correctly parsed, all fields except for the field of \ + ``columnNameOfCorruptRecord`` will be set to ``null``. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -393,13 +395,16 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an \ - user-defined schema. If a schema does not have the field, it drops corrupt \ - records during parsing. When a length of parsed CSV tokens is shorter than \ - an expected length of a schema, it sets `null` for extra fields. + * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + It supports partial result for the records just with less or more tokens \ + than the schema. When it meets a malformed record whose parsed tokens is \ + shorter than an expected length of a schema, it sets ``null`` for extra \ + fields. When a length of tokens is longer than a schema, it drops extra \ + tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e2a97acb5e2a7..e7c1cd9c78187 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -442,13 +442,15 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an user-defined \ - schema. If a schema does not have the field, it drops corrupt records during \ - parsing. When inferring a schema, it implicitly adds a \ - ``columnNameOfCorruptRecord`` field in an output schema. + * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ + field in an output schema. It doesn't support partial results. Even just one \ + field can't be correctly parsed, all fields except for the field of \ + ``columnNameOfCorruptRecord`` will be set to ``null``. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -621,13 +623,16 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non :param mode: allows a mode for dealing with corrupt records during parsing. If None is set, it uses the default value, ``PERMISSIVE``. - * ``PERMISSIVE`` : sets other fields to ``null`` when it meets a corrupted \ - record, and puts the malformed string into a field configured by \ - ``columnNameOfCorruptRecord``. To keep corrupt records, an user can set \ - a string type field named ``columnNameOfCorruptRecord`` in an \ - user-defined schema. If a schema does not have the field, it drops corrupt \ - records during parsing. When a length of parsed CSV tokens is shorter than \ - an expected length of a schema, it sets `null` for extra fields. + * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ + into a field configured by ``columnNameOfCorruptRecord``, and sets other \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + schema does not have the field, it drops corrupt records during parsing. \ + It supports partial result for the records just with less or more tokens \ + than the schema. When it meets a malformed record whose parsed tokens is \ + shorter than an expected length of a schema, it sets ``null`` for extra \ + fields. When a length of tokens is longer than a schema, it drops extra \ + tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index f23851655350a..95cc69e80487d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -236,12 +236,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. * @@ -316,12 +318,14 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo *
  • `mode` (default `PERMISSIVE`): allows a mode for dealing with corrupt records * during parsing. It supports the following case-insensitive modes. * From 1d03d3b248821a05dfd2751eeb0c8b657ebc9073 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Mon, 26 Feb 2018 05:51:52 +0000 Subject: [PATCH 3/6] Address comment. --- python/pyspark/sql/readwriter.py | 8 ++++---- python/pyspark/sql/streaming.py | 8 ++++---- .../main/scala/org/apache/spark/sql/DataFrameReader.scala | 8 ++++---- .../org/apache/spark/sql/streaming/DataStreamReader.scala | 8 ++++---- 4 files changed, 16 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 242455d29052f..b209848580add 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -215,8 +215,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ - field in an output schema. It doesn't support partial results. Even just one \ - field can't be correctly parsed, all fields except for the field of \ + field in an output schema. It does not support partial results. Even just one \ + field can not be correctly parsed, all fields except for the field of \ ``columnNameOfCorruptRecord`` will be set to ``null``. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -401,8 +401,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ It supports partial result for the records just with less or more tokens \ - than the schema. When it meets a malformed record whose parsed tokens is \ - shorter than an expected length of a schema, it sets ``null`` for extra \ + than the schema. When it meets a malformed record having the length of \ + parsed tokens shorter than the length of a schema, it sets ``null`` for extra \ fields. When a length of tokens is longer than a schema, it drops extra \ tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index e7c1cd9c78187..4d2a9ccf4120f 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -448,8 +448,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ - field in an output schema. It doesn't support partial results. Even just one \ - field can't be correctly parsed, all fields except for the field of \ + field in an output schema. It does not support partial results. Even just one \ + field can not be correctly parsed, all fields except for the field of \ ``columnNameOfCorruptRecord`` will be set to ``null``. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. @@ -629,8 +629,8 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ It supports partial result for the records just with less or more tokens \ - than the schema. When it meets a malformed record whose parsed tokens is \ - shorter than an expected length of a schema, it sets ``null`` for extra \ + than the schema. When it meets a malformed record having the length of \ + parsed tokens shorter than the length of a schema, it sets ``null`` for extra \ fields. When a length of tokens is longer than a schema, it drops extra \ tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 24db8dc3f30af..e1379e4541223 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -350,7 +350,7 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records * during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord` - * field in an output schema. It doesn't support partial results. Even just one field can't + * field in an output schema. It does not support partial results. Even just one field can not * be correctly parsed, all fields except for the field of `columnNameOfCorruptRecord` will * be set to `null`.
  • *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • @@ -557,9 +557,9 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records * during parsing. It supports partial result for the records just with less or more tokens - * than the schema. When it meets a malformed record whose parsed tokens is shorter than an - * expected length of a schema, it sets `null` for extra fields. When a length of tokens is - * longer than a schema, it drops extra tokens. + * than the schema. When it meets a malformed record having the length of parsed tokens + * shorter than the length of a schema, it sets `null` for extra fields. When a length of + * tokens is longer than a schema, it drops extra tokens. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 95cc69e80487d..d610184f651f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -241,7 +241,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records * during parsing. When inferring a schema, it implicitly adds a `columnNameOfCorruptRecord` - * field in an output schema. It doesn't support partial results. Even just one field can't + * field in an output schema. It does not support partial results. Even just one field can not * be correctly parsed, all fields except for the field of `columnNameOfCorruptRecord` will * be set to `null`. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • @@ -323,9 +323,9 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records * during parsing. It supports partial result for the records just with less or more tokens - * than the schema. When it meets a malformed record whose parsed tokens is shorter than an - * expected length of a schema, it sets `null` for extra fields. When a length of tokens is - * longer than a schema, it drops extra tokens. + * than the schema. When it meets a malformed record having the length of parsed tokens + * shorter than the length of a schema, it sets `null` for extra fields. When a length of + * tokens is longer than a schema, it drops extra tokens. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * From 4f9b14803f3eff8057e52e36d13f074ec917bde6 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 27 Feb 2018 02:18:53 +0000 Subject: [PATCH 4/6] Address comment. --- python/pyspark/sql/readwriter.py | 9 ++++----- python/pyspark/sql/streaming.py | 9 ++++----- .../scala/org/apache/spark/sql/DataFrameReader.scala | 12 +++++------- .../spark/sql/streaming/DataStreamReader.scala | 12 +++++------- 4 files changed, 18 insertions(+), 24 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index b209848580add..f00c9f24525bd 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -211,13 +211,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ into a field configured by ``columnNameOfCorruptRecord``, and sets other \ - fields to ``null``. To keep corrupt records, an user can set a string type \ - field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + fields to ``null``. It does not support partial results. To keep corrupt \ + records, an user can set a string type field named \ + ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ - field in an output schema. It does not support partial results. Even just one \ - field can not be correctly parsed, all fields except for the field of \ - ``columnNameOfCorruptRecord`` will be set to ``null``. + field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 4d2a9ccf4120f..70cab4caac27d 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -444,13 +444,12 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ into a field configured by ``columnNameOfCorruptRecord``, and sets other \ - fields to ``null``. To keep corrupt records, an user can set a string type \ - field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + fields to ``null``. It does not support partial results. To keep corrupt \ + records, an user can set a string type field named \ + ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ - field in an output schema. It does not support partial results. Even just one \ - field can not be correctly parsed, all fields except for the field of \ - ``columnNameOfCorruptRecord`` will be set to ``null``. + field in an output schema. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index e1379e4541223..960c355774664 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -346,13 +346,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index d610184f651f0..edd13c7e0eaad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -237,13 +237,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. * From 654a59bc23da932cff371cd2c01c359b1b597228 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 27 Feb 2018 08:24:30 +0000 Subject: [PATCH 5/6] Address comment. --- python/pyspark/sql/readwriter.py | 10 +++++----- python/pyspark/sql/streaming.py | 10 +++++----- .../scala/org/apache/spark/sql/DataFrameReader.scala | 8 ++++---- .../apache/spark/sql/streaming/DataStreamReader.scala | 8 ++++---- 4 files changed, 18 insertions(+), 18 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index f00c9f24525bd..1ad2920012e6d 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -399,11 +399,11 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ - It supports partial result for the records just with less or more tokens \ - than the schema. When it meets a malformed record having the length of \ - parsed tokens shorter than the length of a schema, it sets ``null`` for extra \ - fields. When a length of tokens is longer than a schema, it drops extra \ - tokens. + A record with less/more tokens than schema is not a corrupted record. \ + It supports partial result for such records. When it meets a record having \ + the length of parsed tokens shorter than the length of a schema, it sets \ + ``null`` for extra fields. When a length of tokens is longer than a schema, \ + it drops extra tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 70cab4caac27d..70e7441457219 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -627,11 +627,11 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ - It supports partial result for the records just with less or more tokens \ - than the schema. When it meets a malformed record having the length of \ - parsed tokens shorter than the length of a schema, it sets ``null`` for extra \ - fields. When a length of tokens is longer than a schema, it drops extra \ - tokens. + A record with less/more tokens than schema is not a corrupted record. \ + It supports partial result for such records. When it meets a record having \ + the length of parsed tokens shorter than the length of a schema, it sets \ + ``null`` for extra fields. When a length of tokens is longer than a schema, \ + it drops extra tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 960c355774664..07a9fdbb26863 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -554,10 +554,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records - * during parsing. It supports partial result for the records just with less or more tokens - * than the schema. When it meets a malformed record having the length of parsed tokens - * shorter than the length of a schema, it sets `null` for extra fields. When a length of - * tokens is longer than a schema, it drops extra tokens. + * during parsing. A record with less/more tokens than schema is not a corrupted record. + * It supports partial result for such records. When it meets a record having the length + * of parsed tokens shorter than the length of a schema, it sets ``null`` for extra fields. + * When a length of tokens is longer than a schema, it drops extra tokens. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index edd13c7e0eaad..cb02467c80aeb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -320,10 +320,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records - * during parsing. It supports partial result for the records just with less or more tokens - * than the schema. When it meets a malformed record having the length of parsed tokens - * shorter than the length of a schema, it sets `null` for extra fields. When a length of - * tokens is longer than a schema, it drops extra tokens. + * during parsing. A record with less/more tokens than schema is not a corrupted record. + * It supports partial result for such records. When it meets a record having the length + * of parsed tokens shorter than the length of a schema, it sets ``null`` for extra fields. + * When a length of tokens is longer than a schema, it drops extra tokens. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * From daa326d9973b837f2b62d28c9382fbc4b8339659 Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Tue, 27 Feb 2018 12:47:11 +0000 Subject: [PATCH 6/6] Address comment. --- python/pyspark/sql/readwriter.py | 14 ++++++-------- python/pyspark/sql/streaming.py | 14 ++++++-------- .../org/apache/spark/sql/DataFrameReader.scala | 18 +++++++++--------- .../spark/sql/streaming/DataStreamReader.scala | 18 +++++++++--------- 4 files changed, 30 insertions(+), 34 deletions(-) diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py index 1ad2920012e6d..9d05ac7cb39be 100644 --- a/python/pyspark/sql/readwriter.py +++ b/python/pyspark/sql/readwriter.py @@ -211,9 +211,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ into a field configured by ``columnNameOfCorruptRecord``, and sets other \ - fields to ``null``. It does not support partial results. To keep corrupt \ - records, an user can set a string type field named \ - ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ field in an output schema. @@ -399,11 +398,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ - A record with less/more tokens than schema is not a corrupted record. \ - It supports partial result for such records. When it meets a record having \ - the length of parsed tokens shorter than the length of a schema, it sets \ - ``null`` for extra fields. When a length of tokens is longer than a schema, \ - it drops extra tokens. + A record with less/more tokens than schema is not a corrupted record to CSV. \ + When it meets a record having fewer tokens than the length of the schema, \ + sets ``null`` to extra fields. When the record has more tokens than the \ + length of the schema, it drops extra tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 70e7441457219..cc622decfd682 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -444,9 +444,8 @@ def json(self, path, schema=None, primitivesAsString=None, prefersDecimal=None, * ``PERMISSIVE`` : when it meets a corrupted record, puts the malformed string \ into a field configured by ``columnNameOfCorruptRecord``, and sets other \ - fields to ``null``. It does not support partial results. To keep corrupt \ - records, an user can set a string type field named \ - ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ + fields to ``null``. To keep corrupt records, an user can set a string type \ + field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ When inferring a schema, it implicitly adds a ``columnNameOfCorruptRecord`` \ field in an output schema. @@ -627,11 +626,10 @@ def csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=Non fields to ``null``. To keep corrupt records, an user can set a string type \ field named ``columnNameOfCorruptRecord`` in an user-defined schema. If a \ schema does not have the field, it drops corrupt records during parsing. \ - A record with less/more tokens than schema is not a corrupted record. \ - It supports partial result for such records. When it meets a record having \ - the length of parsed tokens shorter than the length of a schema, it sets \ - ``null`` for extra fields. When a length of tokens is longer than a schema, \ - it drops extra tokens. + A record with less/more tokens than schema is not a corrupted record to CSV. \ + When it meets a record having fewer tokens than the length of the schema, \ + sets ``null`` to extra fields. When the record has more tokens than the \ + length of the schema, it drops extra tokens. * ``DROPMALFORMED`` : ignores the whole corrupted records. * ``FAILFAST`` : throws an exception when it meets corrupted records. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 07a9fdbb26863..0139913aaa4e2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -346,11 +346,11 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * during parsing. *
      *
    • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. It - * does not support partial results. To keep corrupt records, an user can set a string - * type field named `columnNameOfCorruptRecord` in an user-defined schema. If a schema - * does not have the field, it drops corrupt records during parsing. When inferring a schema, - * it implicitly adds a `columnNameOfCorruptRecord` field in an output schema.
    • + * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To + * keep corrupt records, an user can set a string type field named + * `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the + * field, it drops corrupt records during parsing. When inferring a schema, it implicitly + * adds a `columnNameOfCorruptRecord` field in an output schema. *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    @@ -554,10 +554,10 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records - * during parsing. A record with less/more tokens than schema is not a corrupted record. - * It supports partial result for such records. When it meets a record having the length - * of parsed tokens shorter than the length of a schema, it sets ``null`` for extra fields. - * When a length of tokens is longer than a schema, it drops extra tokens. + * during parsing. A record with less/more tokens than schema is not a corrupted record to + * CSV. When it meets a record having fewer tokens than the length of the schema, sets + * `null` to extra fields. When the record has more tokens than the length of the schema, + * it drops extra tokens. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • * diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index cb02467c80aeb..61e22fac854f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -237,11 +237,11 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * during parsing. *
      *
    • `PERMISSIVE` : when it meets a corrupted record, puts the malformed string into a - * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. It - * does not support partial results. To keep corrupt records, an user can set a string - * type field named `columnNameOfCorruptRecord` in an user-defined schema. If a schema - * does not have the field, it drops corrupt records during parsing. When inferring a schema, - * it implicitly adds a `columnNameOfCorruptRecord` field in an output schema.
    • + * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To + * keep corrupt records, an user can set a string type field named + * `columnNameOfCorruptRecord` in an user-defined schema. If a schema does not have the + * field, it drops corrupt records during parsing. When inferring a schema, it implicitly + * adds a `columnNameOfCorruptRecord` field in an output schema. *
    • `DROPMALFORMED` : ignores the whole corrupted records.
    • *
    • `FAILFAST` : throws an exception when it meets corrupted records.
    • *
    @@ -320,10 +320,10 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo * field configured by `columnNameOfCorruptRecord`, and sets other fields to `null`. To keep * corrupt records, an user can set a string type field named `columnNameOfCorruptRecord` * in an user-defined schema. If a schema does not have the field, it drops corrupt records - * during parsing. A record with less/more tokens than schema is not a corrupted record. - * It supports partial result for such records. When it meets a record having the length - * of parsed tokens shorter than the length of a schema, it sets ``null`` for extra fields. - * When a length of tokens is longer than a schema, it drops extra tokens. + * during parsing. A record with less/more tokens than schema is not a corrupted record to + * CSV. When it meets a record having fewer tokens than the length of the schema, sets + * `null` to extra fields. When the record has more tokens than the length of the schema, + * it drops extra tokens. *
  • `DROPMALFORMED` : ignores the whole corrupted records.
  • *
  • `FAILFAST` : throws an exception when it meets corrupted records.
  • *