Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,11 @@ class RecordHeaderParserFixedLen(recordSize: Int,
if (fileHeaderBytes > 0 && fileOffset == 0L) {
RecordMetadata(fileHeaderBytes, isValid = false)
} else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) {
RecordMetadata((fileSize - fileOffset).toInt, isValid = false)
RecordMetadata((fileSize - fileOffset - fileFooterBytes).toInt, isValid = false)
} else if (maxOffset - fileOffset >= recordSize) {
RecordMetadata(recordSize, isValid = true)
} else {
RecordMetadata(-1, isValid = false)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class RecordHeaderParserRDW(isBigEndian: Boolean,
if (fileHeaderBytes > getHeaderLength && fileOffset == getHeaderLength) {
RecordMetadata(fileHeaderBytes - getHeaderLength, isValid = false)
} else if (fileSize > 0L && fileFooterBytes > 0 && fileSize - fileOffset <= fileFooterBytes) {
RecordMetadata((fileSize - fileOffset).toInt, isValid = false)
RecordMetadata((fileSize - fileOffset - fileFooterBytes).toInt, isValid = false)
} else {
processRdwHeader(header, fileOffset)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ object IndexGenerator extends Logging {
val needSplit = getSplitCondition(recordsPerIndexEntry, sizePerIndexEntryMB)

// Add the first mandatory index entry
val indexEntry = SparseIndexEntry(0, -1, fileId, recordIndex)
val indexEntry = SparseIndexEntry(dataStream.offset, -1, fileId, recordIndex)
index += indexEntry

var endOfFileReached = false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ final class VarLenNestedReader(copybookContents: Seq[String],
override def next(): Row = Row.fromSeq(iterator.next())
}

override def getReaderProperties: ReaderParameters = readerProperties

override def getCobolSchema: CobolSchema = CobolSchema.fromBaseReader(cobolSchema)

Expand All @@ -57,7 +58,7 @@ final class VarLenNestedReader(copybookContents: Seq[String],
new RowIterator(
new VarLenHierarchicalIterator(cobolSchema.copybook,
binaryData,
readerProperties,
getReaderProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook),
fileNumber,
Expand All @@ -69,7 +70,7 @@ final class VarLenNestedReader(copybookContents: Seq[String],
new RowIterator(
new VarLenNestedIterator(cobolSchema.copybook,
binaryData,
readerProperties,
getReaderProperties,
recordHeaderParser,
recordExtractor(startingRecordIndex, binaryData, cobolSchema.copybook),
fileNumber,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ package za.co.absa.cobrix.spark.cobol.reader

import org.apache.spark.sql.Row
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters
import za.co.absa.cobrix.cobol.reader.stream.SimpleStream

import scala.collection.mutable.ArrayBuffer


Expand All @@ -31,6 +33,9 @@ trait VarLenReader extends Reader with Serializable {
/** Returns true if RDW header of variable length files is big endian */
def isRdwBigEndian: Boolean

/** All the properties that the reader might need. */
def getReaderProperties: ReaderParameters

/**
* Returns a file iterator between particular offsets. This is for faster traversal of big binary files
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,28 @@ private[source] object IndexBuilder extends Logging {
val fileOrder = fileWithOrder.order
val fileSystem = path.getFileSystem(config)

val startOffset = reader.getReaderProperties.fileStartOffset
val maximumBytes = if (reader.getReaderProperties.fileEndOffset == 0) {
0
} else {
val bytesToRead = fileSystem.getContentSummary(path).getLength - reader.getReaderProperties.fileEndOffset - startOffset
if (bytesToRead < 0)
0
else
bytesToRead
}

logger.info(s"Going to generate index for the file: $filePath")
val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, 0, 0),
fileOrder, reader.isRdwBigEndian)
index
val index = reader.generateIndex(new FileStreamer(filePath, fileSystem, startOffset, maximumBytes),
fileOrder, reader.isRdwBigEndian)

val indexWithEndOffset = if (maximumBytes > 0 ){
index.map(entry => if (entry.offsetTo == -1) entry.copy(offsetTo = startOffset + maximumBytes) else entry)
} else {
index
}

indexWithEndOffset
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ class Test29BdwFileSpec extends AnyWordSpec with SparkTestBase with BinaryFileFi
}
}

"File start and and offset" should {
"be supported" in {
testVbRecordLoad(true, true, -1, 1, 2, 2, expected4Records, fileStartOffset = 10, fileEndOffset = 15)
}
}

"in case of failures" should {
"thrown an exception if there is only BDW, but nor RDW" in {
val record: Seq[Byte] = getHeader(5, false, 0) ++ Seq(0xF0.toByte)
Expand Down Expand Up @@ -111,15 +117,20 @@ class Test29BdwFileSpec extends AnyWordSpec with SparkTestBase with BinaryFileFi
records: Int,
expected: String,
options: Map[String, String] = Map.empty[String, String],
expectedPartitions: Option[Int] = None): Unit = {
val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => {
expectedPartitions: Option[Int] = None,
fileStartOffset: Int = 0,
fileEndOffset: Int = 0
): Unit = {
val header: Seq[Byte] = Range(0, fileStartOffset).map(_.toByte)
val footer: Seq[Byte] = Range(0, fileEndOffset).map(n => (n + 100).toByte)
val record: Seq[Byte] = header ++ Range(0, blocks).flatMap(blockNum => {
getHeader(records * 6, bdwBigEndian, bdwAdjustment) ++
Range(0, records).flatMap(recordNum => {
val idx0 = (blockNum * records + recordNum) / 10
val idx1 = (blockNum * records + recordNum) % 10
getHeader(2, rdwBigEndian, rdwAdjustment) ++ Seq((0xF0 + idx0).toByte, (0xF0.toByte + idx1).toByte)
})
})
}) ++ footer

withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 =>
val df = spark
Expand All @@ -131,6 +142,8 @@ class Test29BdwFileSpec extends AnyWordSpec with SparkTestBase with BinaryFileFi
.option("is_rdw_big_endian", rdwBigEndian)
.option("bdw_adjustment", -bdwAdjustment)
.option("rdw_adjustment", -rdwAdjustment)
.option("file_start_offset", fileStartOffset)
.option("file_end_offset", fileEndOffset)
.options(options)
.load(tmpFileName1)

Expand Down