From 088aa6aa373ba8b19e927298e1f770d1fb391a52 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Mon, 17 Apr 2023 08:31:52 +0200 Subject: [PATCH] #601 Fix file offset options when used with a file having VB record format. --- .../RecordHeaderParserFixedLen.scala | 3 +-- .../headerparsers/RecordHeaderParserRDW.scala | 2 +- .../cobol/reader/index/IndexGenerator.scala | 2 +- .../cobol/reader/VarLenNestedReader.scala | 5 ++-- .../spark/cobol/reader/VarLenReader.scala | 5 ++++ .../cobol/source/index/IndexBuilder.scala | 24 ++++++++++++++++--- .../integration/Test29BdwFileSpec.scala | 19 ++++++++++++--- 7 files changed, 48 insertions(+), 12 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala index 53d15f315..db101be10 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserFixedLen.scala @@ -41,12 +41,11 @@ class RecordHeaderParserFixedLen(recordSize: Int, if (fileHeaderBytes > 0 && fileOffset == 0L) { RecordMetadata(fileHeaderBytes, isValid = false) } else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) { - RecordMetadata((fileSize - fileOffset).toInt, isValid = false) + RecordMetadata((fileSize - fileOffset - fileFooterBytes).toInt, isValid = false) } else if (maxOffset - fileOffset >= recordSize) { RecordMetadata(recordSize, isValid = true) } else { RecordMetadata(-1, isValid = false) } } - } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala index 7a477e482..c869c3d0a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/headerparsers/RecordHeaderParserRDW.scala @@ -46,7 +46,7 @@ class RecordHeaderParserRDW(isBigEndian: Boolean, if (fileHeaderBytes > getHeaderLength && fileOffset == getHeaderLength) { RecordMetadata(fileHeaderBytes - getHeaderLength, isValid = false) } else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) { - RecordMetadata((fileSize - fileOffset).toInt, isValid = false) + RecordMetadata((fileSize - fileOffset - fileFooterBytes).toInt, isValid = false) } else { processRdwHeader(header, fileOffset) } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala index 00980f95c..b7a71faba 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/index/IndexGenerator.scala @@ -53,7 +53,7 @@ object IndexGenerator extends Logging { val needSplit = getSplitCondition(recordsPerIndexEntry, sizePerIndexEntryMB) // Add the first mandatory index entry - val indexEntry = SparseIndexEntry(0, -1, fileId, recordIndex) + val indexEntry = SparseIndexEntry(dataStream.offset, -1, fileId, recordIndex) index += indexEntry var endOfFileReached = false diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala index c50a3e67d..d152b532e 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenNestedReader.scala @@ -43,6 +43,7 @@ final class VarLenNestedReader(copybookContents: Seq[String], override def next(): Row = Row.fromSeq(iterator.next()) } + override def getReaderProperties: ReaderParameters = readerProperties override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema) @@ -57,7 +58,7 @@ final class VarLenNestedReader(copybookContents: Seq[String], new RowIterator( new VarLenHierarchicalIterator(cobolSchema.copybook, binaryData, - readerProperties, + getReaderProperties, recordHeaderParser, recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook), fileNumber, @@ -69,7 +70,7 @@ final class VarLenNestedReader(copybookContents: Seq[String], new RowIterator( new VarLenNestedIterator(cobolSchema.copybook, binaryData, - readerProperties, + getReaderProperties, recordHeaderParser, recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook), fileNumber, diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala index 009c2001d..a2a9f6ca6 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/reader/VarLenReader.scala @@ -18,7 +18,9 @@ package za.co.absa.cobrix.spark.cobol.reader import org.apache.spark.sql.Row import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry +import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.stream.SimpleStream + import scala.collection.mutable.ArrayBuffer @@ -31,6 +33,9 @@ trait VarLenReader extends Reader with Serializable { /** Returns true if RDW header of variable length files is big endian */ def isRdwBigEndian: Boolean + /** All the properties that the reader might need. */ + def getReaderProperties: ReaderParameters + /** * Returns a file iterator between particular offsets. This is for faster traversal of big binary files * diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala index eda209554..910135e6a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/index/IndexBuilder.scala @@ -156,10 +156,28 @@ private[source] object IndexBuilder extends Logging { val fileOrder = fileWithOrder.order val fileSystem = path.getFileSystem(config) + val startOffset = reader.getReaderProperties.fileStartOffset + val maximumBytes = if (reader.getReaderProperties.fileEndOffset == 0) { + 0 + } else { + val bytesToRead = fileSystem.getContentSummary(path).getLength - reader.getReaderProperties.fileEndOffset - startOffset + if (bytesToRead < 0) + 0 + else + bytesToRead + } + logger.info(s"Going to generate index for the file: $filePath") - val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, 0, 0), - fileOrder, reader.isRdwBigEndian) - index + val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, startOffset, maximumBytes), + fileOrder, reader.isRdwBigEndian) + + val indexWithEndOffset = if (maximumBytes > 0 ){ + index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry) + } else { + index + } + + indexWithEndOffset } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala index 9638d5948..5705b0932 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala @@ -82,6 +82,12 @@ class Test29BdwFileSpec extends AnyWordSpec with SparkTestBase with BinaryFileFi } } + "File start and and offset" should { + "be supported" in { + testVbRecordLoad(true, true, -1, 1, 2, 2, expected4Records, fileStartOffset = 10, fileEndOffset = 15) + } + } + "in case of failures" should { "thrown an exception if there is only BDW, but nor RDW" in { val record: Seq[Byte] = getHeader(5, false, 0) ++ Seq(0xF0.toByte) @@ -111,15 +117,20 @@ class Test29BdwFileSpec extends AnyWordSpec with SparkTestBase with BinaryFileFi records: Int, expected: String, options: Map[String, String] = Map.empty[String, String], - expectedPartitions: Option[Int] = None): Unit = { - val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => { + expectedPartitions: Option[Int] = None, + fileStartOffset: Int = 0, + fileEndOffset: Int = 0 + ): Unit = { + val header: Seq[Byte] = Range(0, fileStartOffset).map(_.toByte) + val footer: Seq[Byte] = Range(0, fileEndOffset).map(n => (n + 100).toByte) + val record: Seq[Byte] = header ++ Range(0, blocks).flatMap(blockNum => { getHeader(records * 6, bdwBigEndian, bdwAdjustment) ++ Range(0, records).flatMap(recordNum => { val idx0 = (blockNum * records + recordNum) / 10 val idx1 = (blockNum * records + recordNum) % 10 getHeader(2, rdwBigEndian, rdwAdjustment) ++ Seq((0xF0 + idx0).toByte, (0xF0.toByte + idx1).toByte) }) - }) + }) ++ footer withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 => val df = spark @@ -131,6 +142,8 @@ class Test29BdwFileSpec extends AnyWordSpec with SparkTestBase with BinaryFileFi .option("is_rdw_big_endian", rdwBigEndian) .option("bdw_adjustment", -bdwAdjustment) .option("rdw_adjustment", -rdwAdjustment) + .option("file_start_offset", fileStartOffset) + .option("file_end_offset", fileEndOffset) .options(options) .load(tmpFileName1)