Skip to content

Commit adf7d33

Browse files
committed
minor comments
1 parent 20ac52f commit adf7d33

File tree

3 files changed

+11
-9
lines changed

3 files changed

+11
-9
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/FailureSafeParser.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,8 @@ class FailureSafeParser[IN](
4242
(row, badRecord) => {
4343
var i = 0
4444
while (i < actualSchema.length) {
45-
val f = actualSchema(i)
46-
resultRow(schema.fieldIndex(f.name)) = row.map(_.get(i, f.dataType)).orNull
45+
val from = actualSchema(i)
46+
resultRow(schema.fieldIndex(from.name)) = row.map(_.get(i, from.dataType)).orNull
4747
i += 1
4848
}
4949
resultRow(corruptFieldIndex.get) = badRecord()

sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -383,11 +383,12 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
383383
}
384384

385385
verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
386-
val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
386+
val actualSchema =
387+
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
387388

388389
val createParser = CreateJacksonParser.string _
389390
val parsed = jsonDataset.rdd.mapPartitions { iter =>
390-
val rawParser = new JacksonParser(dataSchema, parsedOptions)
391+
val rawParser = new JacksonParser(actualSchema, parsedOptions)
391392
val parser = new FailureSafeParser[String](
392393
input => rawParser.parse(input, createParser, UTF8String.fromString),
393394
parsedOptions.parseMode,
@@ -442,14 +443,15 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging {
442443
}
443444

444445
verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
445-
val dataSchema = StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
446+
val actualSchema =
447+
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
446448

447449
val linesWithoutHeader: RDD[String] = maybeFirstLine.map { firstLine =>
448450
filteredLines.rdd.mapPartitions(CSVUtils.filterHeaderLine(_, firstLine, parsedOptions))
449451
}.getOrElse(filteredLines.rdd)
450452

451453
val parsed = linesWithoutHeader.mapPartitions { iter =>
452-
val rawParser = new UnivocityParser(dataSchema, parsedOptions)
454+
val rawParser = new UnivocityParser(actualSchema, parsedOptions)
453455
val parser = new FailureSafeParser[String](
454456
input => Seq(rawParser.parse(input)),
455457
parsedOptions.parseMode,

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class UnivocityParser(
5151
private val row = new GenericInternalRow(requiredSchema.length)
5252

5353
// Retrieve the raw record string.
54-
private def getCurrentInput(): UTF8String = {
54+
private def getCurrentInput: UTF8String = {
5555
UTF8String.fromString(tokenizer.getContext.currentParsedContent().stripLineEnd)
5656
}
5757

@@ -207,7 +207,7 @@ class UnivocityParser(
207207
}
208208
}
209209
throw BadRecordException(
210-
() => getCurrentInput(),
210+
() => getCurrentInput,
211211
getPartialResult,
212212
new RuntimeException("Malformed CSV record"))
213213
} else {
@@ -221,7 +221,7 @@ class UnivocityParser(
221221
row
222222
} catch {
223223
case NonFatal(e) =>
224-
throw BadRecordException(() => getCurrentInput(), () => None, e)
224+
throw BadRecordException(() => getCurrentInput, () => None, e)
225225
}
226226
}
227227
}

0 commit comments

Comments
 (0)