Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
scala: [ 2.11.12, 2.12.14 ]
scala: [ 2.11.12, 2.12.15 ]
spark: [ 2.4.8, 3.1.2 ]
exclude:
- scala: 2.11.12
Expand Down
58 changes: 53 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -266,8 +266,8 @@ all required dependencies (an uber jar aka fat jar).
Creating an uber jar for Cobrix is very easy. Just clone the repository and run one of the following commands:
```sh
sbt ++2.11.12 assembly -DSPARK_VERSION=2.4.8
sbt ++2.12.14 assembly -DSPARK_VERSION=2.4.8
sbt ++2.12.14 assembly -DSPARK_VERSION=3.1.2
sbt ++2.12.15 assembly -DSPARK_VERSION=2.4.8
sbt ++2.12.15 assembly -DSPARK_VERSION=3.1.2
```

You can collect the uber jar of `spark-cobol` either at
Expand Down Expand Up @@ -317,6 +317,44 @@ val copyBook = CopybookParser.parseTree(copyBookContents)
println(copyBook.generateRecordLayoutPositions())
```

### Fixed record length files
Cobrix assumes files has fixed length (`F`) record format by default. The record length is determined by the length of
the record defined by the copybook. But you can specify the record length explicitly:
```
.option("record_format", "F")
.option("record_length", "250")
```

Fixed block record formats (`FB`) are also supported. The support is _experimental_, if you find any issues, please
let us know. When the record format is 'FB' you can specify block length or number of records per
block. As with 'F' if `record_length` is not specified, it will be determined from the copybook.

Records that have BDWs, but not rdws can be read like this:
```
.option("record_format", "FB")
.option("record_length", "250")
```
or simply
```
.option("record_format", "FB")
```

Records that have neither BDWs nor RDWs can be read like this:

```
.option("record_format", "FB")
.option("record_length", "250")
.option("block_length", "500")
```
or
```
.option("record_format", "FB")
.option("record_length", "250")
.option("records_per_block", "2")
```

More on fixed-length record formats: https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats

### Variable length records support

Cobrix supports variable record length files. The only requirement is that such a file should contain a standard 4 byte
Expand Down Expand Up @@ -1112,7 +1150,6 @@ Again, the full example is available at
| Option (usage example) | Description |
| ------------------------------------------ |:----------------------------------------------------------------------------- |
| .option("paths", "/path1,/path2") | Allows loading data from multiple unrelated paths on the same filesystem. |
| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. |
| .option("file_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each file. |
| .option("file_end_offset", "0") | Specifies the number of bytes to skip at the end of each file. |
| .option("record_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each record before applying copybook fields to data. |
Expand Down Expand Up @@ -1152,11 +1189,17 @@ Again, the full example is available at
| .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length files use `input_file_name()`. |
| .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`. The legacy value `true` is supported and will generate debug fields in HEX. |

##### Variable record length files options
##### Fixed length record format options (for record_format = F or FB)
| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. |
| .option("block_length", "500") | Specifies the block length for FB records. It should be a multiple of 'record_length'. Cannot be used together with `records_per_block` |
| .option("records_per_block", "5") | Specifies the number of records ber block for FB records. Cannot be used together with `block_length` |

##### Variable record length files options (for record_format = V or VB)

| Option (usage example) | Description |
| --------------------------------------------- |:----------------------------------------------------------------------------- |
| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("record_format", "V") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("is_record_sequence", "true") | _[deprecated]_ If 'true' the parser will look for 4 byte RDW headers to read variable record length files. Use `.option("record_format", "V")` instead. |
| .option("is_rdw_big_endian", "true") | Specifies if RDW headers are big endian. They are considered little-endian by default. |
| .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. For BDW use `.option("bdw_adjustment", -4)` |
Expand Down Expand Up @@ -1287,13 +1330,18 @@ For multisegment variable lengths tests:
![](performance/images/exp3_multiseg_wide_records_throughput.svg) ![](performance/images/exp3_multiseg_wide_mb_throughput.svg)

## Changelog
- #### 2.4.1 to be released soon.
- [#420](https://github.com/AbsaOSS/cobrix/issues/420) Add _experimental_ support for [fixed blocked (FB)](https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats) record format.
- [#422](https://github.com/AbsaOSS/cobrix/issues/422) Fixed decoding of 'broken pipe' (`¦`) character from EBCDIC.

- #### 2.4.0 released 7 September 2021.
- [#412](https://github.com/AbsaOSS/cobrix/issues/412) Add support for [variable block (VB aka VBVR)](https://www.ibm.com/docs/en/zos/2.3.0?topic=formats-format-v-records) record format.
Options to adjust BDW settings are added:
- `is_bdw_big_endian` - specifies if BDW is big-endian (false by default)
- `bdw_adjustment` - Specifies how the value of a BDW is different from the block payload. For example, if the side in BDW headers includes BDW record itself, use `.option("bdw_adjustment", "-4")`.
- Options `is_record_sequence` and `is_xcom` are deprecated. Use `.option("record_format", "V")` instead.
- [#417](https://github.com/AbsaOSS/cobrix/issues/417) Multisegment ASCII text files have now direct support using `record_format = D`.

- #### 2.3.0 released 2 August 2021.
- [#405](https://github.com/AbsaOSS/cobrix/issues/405) Fix extracting records that contain redefines of the top level GROUPs.
- [#406](https://github.com/AbsaOSS/cobrix/issues/406) Use 'collapse_root' retention policy by default. This is the breaking,
Expand Down
6 changes: 3 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -68,11 +68,11 @@ lazy val sparkCobol = (project in file("spark-cobol"))
name := "spark-cobol",
printSparkVersion := {
val log = streams.value.log
log.info(s"Building with Spark $sparkVersion")
sparkVersion
log.info(s"Building with Spark ${sparkVersion(scalaVersion.value)}, Scala ${scalaVersion.value}")
sparkVersion(scalaVersion.value)
},
(Compile / compile) := ((Compile / compile) dependsOn printSparkVersion).value,
libraryDependencies ++= SparkCobolDependencies :+ getScalaDependency(scalaVersion.value),
libraryDependencies ++= SparkCobolDependencies(scalaVersion.value) :+ getScalaDependency(scalaVersion.value),
dependencyOverrides ++= SparkCobolDependenciesOverride,
Test / fork := true, // Spark tests fail randomly otherwise
populateBuildInfoTemplate,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class CodePage037 extends CodePage {
spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, // 48 - 63
' ', rsp, 'â', 'ä', 'à', 'á', 'ã', 'å', 'ç', 'ñ', '¢', '.', '<', '(', '+', '|', // 64 - 79
'&', 'é', 'ê', 'ë', 'è', 'í', 'î', 'ï', 'ì', 'ß', '!', '$', '*', ')', ';', '¬', // 80 - 95
'-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '|', ',', '%', '_', '>', '?', // 96 - 111
'-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '¦', ',', '%', '_', '>', '?', // 96 - 111
'ø', 'É', 'Ê', 'Ë', 'È', 'Í', 'Î', 'Ï', 'Ì', '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
'Ø', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', '«', '»', 'ð', 'ý', 'þ', '±', // 128 - 143
'°', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 'ª', 'º', 'æ', '¸', 'Æ', '¤', // 144 - 159
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ class CodePage037Ext extends CodePage {
spc, spc, c16, spc, spc, spc, spc, c04, spc, spc, spc, spc, c14, c15, spc, c1a, // 48 - 63
' ', rsp, 'â', 'ä', 'à', 'á', 'ã', 'å', 'ç', 'ñ', '¢', '.', '<', '(', '+', '|', // 64 - 79
'&', 'é', 'ê', 'ë', 'è', 'í', 'î', 'ï', 'ì', 'ß', '!', '$', '*', ')', ';', '¬', // 80 - 95
'-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '|', ',', '%', '_', '>', '?', // 96 - 111
'-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '¦', ',', '%', '_', '>', '?', // 96 - 111
'ø', 'É', 'Ê', 'Ë', 'È', 'Í', 'Î', 'Ï', 'Ì', '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
'Ø', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', '«', '»', 'ð', 'ý', 'þ', '±', // 128 - 143
'°', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 'ª', 'º', 'æ', '¸', 'Æ', '¤', // 144 - 159
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class CodePageCommon extends CodePage {
spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, // 48 - 63
' ', ' ', spc, spc, spc, spc, spc, spc, spc, spc, spc, '.', '<', '(', '+', '|', // 64 - 79
'&', spc, spc, spc, spc, spc, spc, spc, spc, spc, '!', '$', '*', ')', ';', spc, // 80 - 95
'-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '|', ',', '%', '_', '>', '?', // 96 - 111
'-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '¦', ',', '%', '_', '>', '?', // 96 - 111
spc, spc, spc, spc, spc, spc, spc, spc, spc, '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
spc, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', spc, spc, spc, spc, spc, spc, // 128 - 143
spc, 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', spc, spc, spc, spc, spc, spc, // 144 - 159
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class CodePageCommonExt extends CodePage {
spc, spc, c16, spc, spc, spc, spc, c04, spc, spc, spc, spc, c14, c15, spc, spc, // 48 - 63
' ', ' ', spc, spc, spc, spc, spc, spc, spc, spc, spc, '.', '<', '(', '+', '|', // 64 - 79
'&', spc, spc, spc, spc, spc, spc, spc, spc, spc, '!', '$', '*', ')', ';', spc, // 80 - 95
'-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '|', ',', '%', '_', '>', '?', // 96 - 111
'-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '¦', ',', '%', '_', '>', '?', // 96 - 111
spc, spc, spc, spc, spc, spc, spc, spc, spc, '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
spc, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', spc, spc, spc, spc, spc, spc, // 128 - 143
spc, 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', spc, spc, spc, spc, spc, spc, // 144 - 159
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,15 @@ sealed trait RecordFormat

object RecordFormat {
case object FixedLength extends RecordFormat
case object FixedBlock extends RecordFormat
case object VariableLength extends RecordFormat
case object VariableBlock extends RecordFormat
case object AsciiText extends RecordFormat

def withNameOpt(s: String): Option[RecordFormat] = {
s match {
case "F" => Some(FixedLength)
case "FB" => Some(FixedBlock)
case "V" => Some(VariableLength)
case "VB" => Some(VariableBlock)
case "D" => Some(AsciiText)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
Expand Down Expand Up @@ -63,19 +64,25 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
copybook: Copybook
): Option[RawRecordExtractor] = {
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)
val bdwParams = RecordHeaderParameters(readerProperties.isBdwBigEndian, readerProperties.bdwAdjustment)

val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams)
val bdwDecoder = new RecordHeaderDecoderBdw(bdwParams)

val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoder, readerProperties.reAdditionalInfo)
val bdwOpt = readerProperties.bdw
val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment))
val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams))

val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)

readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Some(RawRecordExtractorFactory.createRecordHeaderParser(recordExtractorClass, reParams))
case None if readerProperties.isText =>
Some(new TextRecordExtractor(reParams))
case None if readerProperties.hasBdw =>
case None if readerProperties.recordFormat == FixedBlock =>
val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
FixedBlockParameters.validate(fbParams)
Some(new FixedBlockRawRecordExtractor(reParams, fbParams))
case None if readerProperties.recordFormat == VariableBlock =>
Some(new VariableBlockVariableRecordExtractor(reParams))
case None if readerProperties.variableSizeOccurs &&
readerProperties.recordHeaderParser.isEmpty &&
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.extractors.raw

case class FixedBlockParameters(
recordLength: Option[Int],
blockLength: Option[Int],
recordsPerBlock: Option[Int]
)

object FixedBlockParameters {
def validate(params: FixedBlockParameters): Unit = {
if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) {
throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.")
}
params.recordLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Record length should be positive. Got $x."))
params.blockLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Block length should be positive. Got $x."))
params.recordsPerBlock.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Records per block should be positive. Got $x."))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright 2018 ABSA Group Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package za.co.absa.cobrix.cobol.reader.extractors.raw

import scala.collection.mutable

class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor {
private val recordQueue = new mutable.Queue[Array[Byte]]

private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize)
private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize))

override def offset: Long = ctx.inputStream.offset

override def hasNext: Boolean = {
if (recordQueue.isEmpty) {
readNextBlock()
}
recordQueue.nonEmpty
}

private def readNextBlock(): Unit = {
if (!ctx.inputStream.isEndOfStream) {
var bdwOffset = ctx.inputStream.offset

val nextBlockSize = bdwSize.getOrElse({
val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize)
val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
bdwOffset += ctx.bdwDecoder.headerSize
blockLength
})

val blockBuffer = ctx.inputStream.next(nextBlockSize)

var blockIndex = 0

while (blockIndex < blockBuffer.length) {
val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize)
if (payload.length > 0) {
recordQueue.enqueue(payload)
}
blockIndex += recordSize
}
}
}


@throws[NoSuchElementException]
override def next(): Array[Byte] = {
if (!hasNext) {
throw new NoSuchElementException
}
recordQueue.dequeue()
}
}
Loading