diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index 7c9d7a93c..71a30b8e2 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -12,7 +12,7 @@ jobs:
strategy:
fail-fast: false
matrix:
- scala: [ 2.11.12, 2.12.14 ]
+ scala: [ 2.11.12, 2.12.15 ]
spark: [ 2.4.8, 3.1.2 ]
exclude:
- scala: 2.11.12
diff --git a/README.md b/README.md
index cc17da0cb..08332010f 100644
--- a/README.md
+++ b/README.md
@@ -266,8 +266,8 @@ all required dependencies (an uber jar aka fat jar).
Creating an uber jar for Cobrix is very easy. Just clone the repository and run one of the following commands:
```sh
sbt ++2.11.12 assembly -DSPARK_VERSION=2.4.8
-sbt ++2.12.14 assembly -DSPARK_VERSION=2.4.8
-sbt ++2.12.14 assembly -DSPARK_VERSION=3.1.2
+sbt ++2.12.15 assembly -DSPARK_VERSION=2.4.8
+sbt ++2.12.15 assembly -DSPARK_VERSION=3.1.2
```
You can collect the uber jar of `spark-cobol` either at
@@ -317,6 +317,44 @@ val copyBook = CopybookParser.parseTree(copyBookContents)
println(copyBook.generateRecordLayoutPositions())
```
+### Fixed record length files
+Cobrix assumes files has fixed length (`F`) record format by default. The record length is determined by the length of
+the record defined by the copybook. But you can specify the record length explicitly:
+```
+.option("record_format", "F")
+.option("record_length", "250")
+```
+
+Fixed block record formats (`FB`) are also supported. The support is _experimental_, if you find any issues, please
+let us know. When the record format is 'FB' you can specify block length or number of records per
+block. As with 'F' if `record_length` is not specified, it will be determined from the copybook.
+
+Records that have BDWs, but not rdws can be read like this:
+```
+.option("record_format", "FB")
+.option("record_length", "250")
+```
+or simply
+```
+.option("record_format", "FB")
+```
+
+Records that have neither BDWs nor RDWs can be read like this:
+
+```
+.option("record_format", "FB")
+.option("record_length", "250")
+.option("block_length", "500")
+```
+or
+```
+.option("record_format", "FB")
+.option("record_length", "250")
+.option("records_per_block", "2")
+```
+
+More on fixed-length record formats: https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats
+
### Variable length records support
Cobrix supports variable record length files. The only requirement is that such a file should contain a standard 4 byte
@@ -1112,7 +1150,6 @@ Again, the full example is available at
| Option (usage example) | Description |
| ------------------------------------------ |:----------------------------------------------------------------------------- |
| .option("paths", "/path1,/path2") | Allows loading data from multiple unrelated paths on the same filesystem. |
-| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. |
| .option("file_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each file. |
| .option("file_end_offset", "0") | Specifies the number of bytes to skip at the end of each file. |
| .option("record_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each record before applying copybook fields to data. |
@@ -1152,11 +1189,17 @@ Again, the full example is available at
| .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length files use `input_file_name()`. |
| .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`. The legacy value `true` is supported and will generate debug fields in HEX. |
-##### Variable record length files options
+##### Fixed length record format options (for record_format = F or FB)
+| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
+| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. |
+| .option("block_length", "500") | Specifies the block length for FB records. It should be a multiple of 'record_length'. Cannot be used together with `records_per_block` |
+| .option("records_per_block", "5") | Specifies the number of records ber block for FB records. Cannot be used together with `block_length` |
+
+##### Variable record length files options (for record_format = V or VB)
| Option (usage example) | Description |
| --------------------------------------------- |:----------------------------------------------------------------------------- |
-| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
+| .option("record_format", "V") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). |
| .option("is_record_sequence", "true") | _[deprecated]_ If 'true' the parser will look for 4 byte RDW headers to read variable record length files. Use `.option("record_format", "V")` instead. |
| .option("is_rdw_big_endian", "true") | Specifies if RDW headers are big endian. They are considered little-endian by default. |
| .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. For BDW use `.option("bdw_adjustment", -4)` |
@@ -1287,6 +1330,10 @@ For multisegment variable lengths tests:
 
## Changelog
+- #### 2.4.1 to be released soon.
+ - [#420](https://github.com/AbsaOSS/cobrix/issues/420) Add _experimental_ support for [fixed blocked (FB)](https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats) record format.
+ - [#422](https://github.com/AbsaOSS/cobrix/issues/422) Fixed decoding of 'broken pipe' (`¦`) character from EBCDIC.
+
- #### 2.4.0 released 7 September 2021.
- [#412](https://github.com/AbsaOSS/cobrix/issues/412) Add support for [variable block (VB aka VBVR)](https://www.ibm.com/docs/en/zos/2.3.0?topic=formats-format-v-records) record format.
Options to adjust BDW settings are added:
@@ -1294,6 +1341,7 @@ For multisegment variable lengths tests:
- `bdw_adjustment` - Specifies how the value of a BDW is different from the block payload. For example, if the side in BDW headers includes BDW record itself, use `.option("bdw_adjustment", "-4")`.
- Options `is_record_sequence` and `is_xcom` are deprecated. Use `.option("record_format", "V")` instead.
- [#417](https://github.com/AbsaOSS/cobrix/issues/417) Multisegment ASCII text files have now direct support using `record_format = D`.
+
- #### 2.3.0 released 2 August 2021.
- [#405](https://github.com/AbsaOSS/cobrix/issues/405) Fix extracting records that contain redefines of the top level GROUPs.
- [#406](https://github.com/AbsaOSS/cobrix/issues/406) Use 'collapse_root' retention policy by default. This is the breaking,
diff --git a/build.sbt b/build.sbt
index a1f389442..96153b743 100644
--- a/build.sbt
+++ b/build.sbt
@@ -68,11 +68,11 @@ lazy val sparkCobol = (project in file("spark-cobol"))
name := "spark-cobol",
printSparkVersion := {
val log = streams.value.log
- log.info(s"Building with Spark $sparkVersion")
- sparkVersion
+ log.info(s"Building with Spark ${sparkVersion(scalaVersion.value)}, Scala ${scalaVersion.value}")
+ sparkVersion(scalaVersion.value)
},
(Compile / compile) := ((Compile / compile) dependsOn printSparkVersion).value,
- libraryDependencies ++= SparkCobolDependencies :+ getScalaDependency(scalaVersion.value),
+ libraryDependencies ++= SparkCobolDependencies(scalaVersion.value) :+ getScalaDependency(scalaVersion.value),
dependencyOverrides ++= SparkCobolDependenciesOverride,
Test / fork := true, // Spark tests fail randomly otherwise
populateBuildInfoTemplate,
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala
index 43e2e1e49..e9bb29532 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala
@@ -46,7 +46,7 @@ class CodePage037 extends CodePage {
spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, // 48 - 63
' ', rsp, 'â', 'ä', 'à', 'á', 'ã', 'å', 'ç', 'ñ', '¢', '.', '<', '(', '+', '|', // 64 - 79
'&', 'é', 'ê', 'ë', 'è', 'í', 'î', 'ï', 'ì', 'ß', '!', '$', '*', ')', ';', '¬', // 80 - 95
- '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '|', ',', '%', '_', '>', '?', // 96 - 111
+ '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '¦', ',', '%', '_', '>', '?', // 96 - 111
'ø', 'É', 'Ê', 'Ë', 'È', 'Í', 'Î', 'Ï', 'Ì', '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
'Ø', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', '«', '»', 'ð', 'ý', 'þ', '±', // 128 - 143
'°', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 'ª', 'º', 'æ', '¸', 'Æ', '¤', // 144 - 159
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala
index 1229e6467..d81c95364 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala
@@ -79,7 +79,7 @@ class CodePage037Ext extends CodePage {
spc, spc, c16, spc, spc, spc, spc, c04, spc, spc, spc, spc, c14, c15, spc, c1a, // 48 - 63
' ', rsp, 'â', 'ä', 'à', 'á', 'ã', 'å', 'ç', 'ñ', '¢', '.', '<', '(', '+', '|', // 64 - 79
'&', 'é', 'ê', 'ë', 'è', 'í', 'î', 'ï', 'ì', 'ß', '!', '$', '*', ')', ';', '¬', // 80 - 95
- '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '|', ',', '%', '_', '>', '?', // 96 - 111
+ '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '¦', ',', '%', '_', '>', '?', // 96 - 111
'ø', 'É', 'Ê', 'Ë', 'È', 'Í', 'Î', 'Ï', 'Ì', '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
'Ø', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', '«', '»', 'ð', 'ý', 'þ', '±', // 128 - 143
'°', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 'ª', 'º', 'æ', '¸', 'Æ', '¤', // 144 - 159
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala
index 999d29aef..5d48118cf 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala
@@ -41,7 +41,7 @@ class CodePageCommon extends CodePage {
spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, // 48 - 63
' ', ' ', spc, spc, spc, spc, spc, spc, spc, spc, spc, '.', '<', '(', '+', '|', // 64 - 79
'&', spc, spc, spc, spc, spc, spc, spc, spc, spc, '!', '$', '*', ')', ';', spc, // 80 - 95
- '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '|', ',', '%', '_', '>', '?', // 96 - 111
+ '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '¦', ',', '%', '_', '>', '?', // 96 - 111
spc, spc, spc, spc, spc, spc, spc, spc, spc, '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
spc, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', spc, spc, spc, spc, spc, spc, // 128 - 143
spc, 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', spc, spc, spc, spc, spc, spc, // 144 - 159
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala
index 5881e7174..fb8fe79e0 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala
@@ -75,7 +75,7 @@ class CodePageCommonExt extends CodePage {
spc, spc, c16, spc, spc, spc, spc, c04, spc, spc, spc, spc, c14, c15, spc, spc, // 48 - 63
' ', ' ', spc, spc, spc, spc, spc, spc, spc, spc, spc, '.', '<', '(', '+', '|', // 64 - 79
'&', spc, spc, spc, spc, spc, spc, spc, spc, spc, '!', '$', '*', ')', ';', spc, // 80 - 95
- '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '|', ',', '%', '_', '>', '?', // 96 - 111
+ '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '¦', ',', '%', '_', '>', '?', // 96 - 111
spc, spc, spc, spc, spc, spc, spc, spc, spc, '`', ':', '#', '@', qts, '=', qtd, // 112 - 127
spc, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', spc, spc, spc, spc, spc, spc, // 128 - 143
spc, 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', spc, spc, spc, spc, spc, spc, // 144 - 159
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala
index 75b588f5e..37349ccfb 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala
@@ -20,6 +20,7 @@ sealed trait RecordFormat
object RecordFormat {
case object FixedLength extends RecordFormat
+ case object FixedBlock extends RecordFormat
case object VariableLength extends RecordFormat
case object VariableBlock extends RecordFormat
case object AsciiText extends RecordFormat
@@ -27,6 +28,7 @@ object RecordFormat {
def withNameOpt(s: String): Option[RecordFormat] = {
s match {
case "F" => Some(FixedLength)
+ case "FB" => Some(FixedBlock)
case "V" => Some(VariableLength)
case "VB" => Some(VariableBlock)
case "D" => Some(AsciiText)
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
index 0a4fb572d..7ec5a61de 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala
@@ -22,8 +22,9 @@ import za.co.absa.cobrix.cobol.parser.common.Constants
import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage
import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC}
import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory}
+import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock}
import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser}
-import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
+import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor}
import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler
import za.co.absa.cobrix.cobol.reader.index.IndexGenerator
import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry
@@ -63,19 +64,25 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String],
copybook: Copybook
): Option[RawRecordExtractor] = {
val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment)
- val bdwParams = RecordHeaderParameters(readerProperties.isBdwBigEndian, readerProperties.bdwAdjustment)
val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams)
- val bdwDecoder = new RecordHeaderDecoderBdw(bdwParams)
- val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoder, readerProperties.reAdditionalInfo)
+ val bdwOpt = readerProperties.bdw
+ val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment))
+ val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams))
+
+ val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo)
readerProperties.recordExtractor match {
case Some(recordExtractorClass) =>
Some(RawRecordExtractorFactory.createRecordHeaderParser(recordExtractorClass, reParams))
case None if readerProperties.isText =>
Some(new TextRecordExtractor(reParams))
- case None if readerProperties.hasBdw =>
+ case None if readerProperties.recordFormat == FixedBlock =>
+ val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock)
+ FixedBlockParameters.validate(fbParams)
+ Some(new FixedBlockRawRecordExtractor(reParams, fbParams))
+ case None if readerProperties.recordFormat == VariableBlock =>
Some(new VariableBlockVariableRecordExtractor(reParams))
case None if readerProperties.variableSizeOccurs &&
readerProperties.recordHeaderParser.isEmpty &&
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala
new file mode 100644
index 000000000..aac7f9b8c
--- /dev/null
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.cobol.reader.extractors.raw
+
+case class FixedBlockParameters(
+ recordLength: Option[Int],
+ blockLength: Option[Int],
+ recordsPerBlock: Option[Int]
+ )
+
+object FixedBlockParameters {
+ def validate(params: FixedBlockParameters): Unit = {
+ if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) {
+ throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.")
+ }
+ params.recordLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Record length should be positive. Got $x."))
+ params.blockLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Block length should be positive. Got $x."))
+ params.recordsPerBlock.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Records per block should be positive. Got $x."))
+ }
+}
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala
new file mode 100644
index 000000000..f8b355546
--- /dev/null
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.cobol.reader.extractors.raw
+
+import scala.collection.mutable
+
+class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor {
+ private val recordQueue = new mutable.Queue[Array[Byte]]
+
+ private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize)
+ private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize))
+
+ override def offset: Long = ctx.inputStream.offset
+
+ override def hasNext: Boolean = {
+ if (recordQueue.isEmpty) {
+ readNextBlock()
+ }
+ recordQueue.nonEmpty
+ }
+
+ private def readNextBlock(): Unit = {
+ if (!ctx.inputStream.isEndOfStream) {
+ var bdwOffset = ctx.inputStream.offset
+
+ val nextBlockSize = bdwSize.getOrElse({
+ val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize)
+ val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset)
+ bdwOffset += ctx.bdwDecoder.headerSize
+ blockLength
+ })
+
+ val blockBuffer = ctx.inputStream.next(nextBlockSize)
+
+ var blockIndex = 0
+
+ while (blockIndex < blockBuffer.length) {
+ val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize)
+ if (payload.length > 0) {
+ recordQueue.enqueue(payload)
+ }
+ blockIndex += recordSize
+ }
+ }
+ }
+
+
+ @throws[NoSuchElementException]
+ override def next(): Array[Byte] = {
+ if (!hasNext) {
+ throw new NoSuchElementException
+ }
+ recordQueue.dequeue()
+ }
+}
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala
similarity index 59%
rename from examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala
rename to cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala
index 56d61a51d..e772ef3bb 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala
@@ -14,24 +14,11 @@
* limitations under the License.
*/
-package com.example.spark.cobol.examples.parser
-
-import za.co.absa.cobrix.cobol.parser.reader.FSReader
-
-object ReaderExample {
-
- def main(args: Array[String]): Unit = {
-
- val reader = new FSReader("../../examples/example_copybook.cob", "../../examples/example_data/file1.bin")
-
- val it = reader.getIterator
-
- println(it.hasNext)
-
- while (it.hasNext) {
- println(it.next())
- }
-
- }
-
-}
+package za.co.absa.cobrix.cobol.reader.parameters
+
+case class Bdw(
+ isBigEndian: Boolean,
+ adjustment: Int,
+ blockLength: Option[Int],
+ recordsPerBlock: Option[Int]
+ )
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala
index 6e360a686..8906c32a4 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala
@@ -21,12 +21,14 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPoint
import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy
import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy
import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, StringTrimmingPolicy}
+import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
+import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
/**
* These are properties for customizing mainframe binary data reader.
- *
+ * @param recordFormat Record format
* @param isEbcdic If true the input data file encoding is EBCDIC, otherwise it is ASCII
* @param isText If true line ending characters will be used (LF / CRLF) as the record separator
* @param ebcdicCodePage Specifies what code page to use for EBCDIC to ASCII/Unicode conversions
@@ -38,7 +40,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length.
* @param lengthFieldName A name of a field that contains record length. Optional. If not set the copybook record length will be used.
* @param isRecordSequence Does input files have 4 byte record length headers
- * @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method
+ * @param bdw Block descriptor word (if specified), for FB and VB record formats
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
* @param rdwAdjustment Controls a mismatch between RDW and record length
* @param isIndexGenerationNeeded Is indexing input file before processing is requested
@@ -64,6 +66,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function
*/
case class ReaderParameters(
+ recordFormat: RecordFormat = FixedLength,
isEbcdic: Boolean = true,
isText: Boolean = false,
ebcdicCodePage: String = "common",
@@ -72,15 +75,13 @@ case class ReaderParameters(
isUtf16BigEndian: Boolean = true,
floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM,
variableSizeOccurs: Boolean = false,
- recordLength: Option[Int] = None ,
+ recordLength: Option[Int] = None,
lengthFieldName: Option[String] = None,
isRecordSequence: Boolean = false,
- hasBdw: Boolean = false,
+ bdw: Option[Bdw] = None,
isRdwBigEndian: Boolean = false,
- isBdwBigEndian: Boolean = false,
isRdwPartRecLength: Boolean = false,
rdwAdjustment: Int = 0,
- bdwAdjustment: Int = 0,
isIndexGenerationNeeded: Boolean = false,
inputSplitRecords: Option[Int] = None,
inputSplitSizeMB: Option[Int] = None,
diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala
index def950152..12990fbd3 100644
--- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala
+++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala
@@ -20,12 +20,10 @@ package za.co.absa.cobrix.cobol.reader.parameters
* This class holds the parameters currently used for parsing variable-length records.
*
* @param isRecordSequence Does input files have 4 byte record length headers
- * @param hasBdw Does the file have BDW headers
+ * @param bdw Block descriptor word (if specified), for FB and VB record formats
* @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method
- * @param isBdwBigEndian Is BDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method
* @param isRdwPartRecLength Does RDW count itself as part of record length itself
* @param rdwAdjustment Controls a mismatch between RDW and record length
- * @param bdwAdjustment Controls a mismatch between BDW and record length
* @param recordHeaderParser An optional custom record header parser for non-standard RDWs
* @param recordExtractor An optional custom raw record parser class non-standard record types
* @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser
@@ -44,12 +42,10 @@ package za.co.absa.cobrix.cobol.reader.parameters
*/
case class VariableLengthParameters(
isRecordSequence: Boolean, // [deprecated by recordFormat]
- hasBdw: Boolean, // [deprecated by recordFormat]
+ bdw: Option[Bdw],
isRdwBigEndian: Boolean,
- isBdwBigEndian: Boolean,
isRdwPartRecLength: Boolean,
rdwAdjustment: Int,
- bdwAdjustment: Int,
recordHeaderParser: Option[String],
recordExtractor: Option[String],
rhpAdditionalInfo: Option[String],
diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala
new file mode 100644
index 000000000..3b3fdfb1a
--- /dev/null
+++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala
@@ -0,0 +1,177 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.cobol.reader.extractors.raw
+
+import org.scalatest.WordSpec
+import za.co.absa.cobrix.cobol.parser.CopybookParser
+import za.co.absa.cobrix.cobol.reader.memorystream.TestByteStream
+import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParametersFactory}
+
+class FixedBlockRawRecordExtractorSuite extends WordSpec {
+ private val copybookContent =
+ """ 01 RECORD.
+ 02 X PIC X(2).
+ """
+ private val copybook = CopybookParser.parseTree(copybookContent)
+
+ private val fbParams = FixedBlockParameters(None, Some(2), None)
+
+ "fixed block fixed length records" should {
+ "be able to read a FB file that has no data" in {
+ val rc = getRawRecordContext(Array[Byte]())
+
+ val extractor = new FixedBlockRawRecordExtractor(rc, fbParams)
+
+ assert(!extractor.hasNext)
+
+ intercept[NoSuchElementException] {
+ extractor.next()
+ }
+ }
+
+ "be able to read a FB file that has an incomplete record" in {
+ val rc = getRawRecordContext(Array[Byte](0xF0.toByte))
+
+ val extractor = new FixedBlockRawRecordExtractor(rc, fbParams)
+
+ assert(extractor.hasNext)
+
+ val r0 = extractor.next()
+
+ assert(r0.length == 1)
+ assert(r0.head == 0xF0.toByte)
+
+ intercept[NoSuchElementException] {
+ extractor.next()
+ }
+ }
+
+ "be able to read a FB file that has one record per block" in {
+ val rc = getRawRecordContext(1)
+
+ val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(Some(2), None, Some(1)))
+
+ assert(extractor.hasNext)
+
+ val r0 = extractor.next()
+ assert(r0.length == 2)
+ assert(r0.head == 0xF0.toByte)
+ assert(r0(1) == 0xF0.toByte)
+
+ assert(extractor.next().head == 0xF1.toByte)
+ assert(extractor.next().head == 0xF2.toByte)
+ assert(!extractor.hasNext)
+ }
+
+ "be able to read a VBVR file that has multiple records per block" in {
+ val rc = getRawRecordContext(3)
+
+ val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(None, None, Some(3)))
+
+ assert(extractor.hasNext)
+
+ val r0 = extractor.next()
+ assert(r0.length == 2)
+ assert(r0.head == 0xF0.toByte)
+ assert(r0(1) == 0xF0.toByte)
+
+ assert(extractor.next().head == 0xF1.toByte)
+ assert(extractor.next().head == 0xF2.toByte)
+ assert(extractor.next().head == 0xF3.toByte)
+ assert(extractor.next().head == 0xF4.toByte)
+ assert(extractor.next().head == 0xF5.toByte)
+ assert(extractor.next().head == 0xF6.toByte)
+ assert(extractor.next().head == 0xF7.toByte)
+ assert(extractor.next().head == 0xF8.toByte)
+ assert(!extractor.hasNext)
+ }
+
+ "allow neither block length nor records per block to be specified" in {
+ val fb = FixedBlockParameters(Some(1), None, None)
+
+ assert(fb.blockLength.isEmpty)
+ assert(fb.recordsPerBlock.isEmpty)
+ }
+ }
+
+ "failures" should {
+ "throw an exception when both block length and records per block are specified" in {
+ val fb = FixedBlockParameters(Some(1), Some(1), Some(1))
+
+ val ex = intercept[IllegalArgumentException] {
+ FixedBlockParameters.validate(fb)
+ }
+
+ assert(ex.getMessage.contains("FB record format requires either block length or number records per block to be specified, but not both."))
+ }
+
+ "throw an exception when record length is zero" in {
+ val fb = FixedBlockParameters(Some(0), Some(1), None)
+
+ val ex = intercept[IllegalArgumentException] {
+ FixedBlockParameters.validate(fb)
+ }
+
+ assert(ex.getMessage.contains("Record length should be positive. Got 0."))
+ }
+
+ "throw an exception when block size is zero" in {
+ val fb = FixedBlockParameters(Some(1), Some(0), None)
+
+ val ex = intercept[IllegalArgumentException] {
+ FixedBlockParameters.validate(fb)
+ }
+
+ assert(ex.getMessage.contains("Block length should be positive. Got 0."))
+ }
+
+ "throw an exception when records per block is zero" in {
+ val fb = FixedBlockParameters(Some(1), None, Some(0))
+
+ val ex = intercept[IllegalArgumentException] {
+ FixedBlockParameters.validate(fb)
+ }
+
+ assert(ex.getMessage.contains("Records per block should be positive. Got 0."))
+ }
+ }
+
+ private def getRawRecordContext(recordsPerBlock: Int): RawRecordContext = {
+ val numOfBlocks = 3
+
+ val bytes = Range(0, numOfBlocks)
+ .flatMap(i => {
+ Range(0, recordsPerBlock).flatMap(j => {
+ val num = (i * recordsPerBlock + j) % 10
+ val v = (0xF0 + num).toByte
+ Array[Byte](v, v)
+ })
+ }).toArray[Byte]
+
+ getRawRecordContext(bytes)
+ }
+
+ private def getRawRecordContext(bytes: Array[Byte]): RawRecordContext = {
+ val ibs = new TestByteStream(bytes)
+
+ val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0))
+ val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0))
+
+ RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "")
+ }
+
+}
diff --git a/data/test9_expected/test9_cp037.txt b/data/test9_expected/test9_cp037.txt
index 0ae58b103..bc1ec1009 100644
--- a/data/test9_expected/test9_cp037.txt
+++ b/data/test9_expected/test9_cp037.txt
@@ -1,11 +1,11 @@
{"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"©û ¯á3Z ÑT ¼Å8","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":36.88}
{"CURRENCY":"CZK","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ùçc¤ ³® Aº ëd","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":59.80}
-{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´ #ñ#B \" :Ã|à","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":767.31}
+{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´ #ñ#B \" :æà","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":767.31}
{"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"ZÁ Ì0z eCjú","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":873.44}
{"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"8oÂÔàÒ «Ð· \"ç","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":39.71}
{"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"¿ ]Æ»ýY)Ô%@ÌÒ","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":536.19}
{"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ëù±l¥Ù Ë^o _²<","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":346.57}
-{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få© èÙªÎÓÖ|","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":471.30}
+{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få© èÙªÎÓÖ¦","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":471.30}
{"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"¶ýB}óþ F#«RÇÙ","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":287.83}
{"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ý ëºê6 %±Çì","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":60112.00}
{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"ïêÖj% ®¯\\¶ X","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":539.59}
diff --git a/data/test9_expected/test9_cp037_ext.txt b/data/test9_expected/test9_cp037_ext.txt
index 33f3b2a76..39dd3f1bc 100644
--- a/data/test9_expected/test9_cp037_ext.txt
+++ b/data/test9_expected/test9_cp037_ext.txt
@@ -1,11 +1,11 @@
{"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"©û\u0012¯á3Z ÑT ¼Å8\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":36.88}
{"CURRENCY":"CZK","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ùçc¤ ³®\u001A Aº\u0014ëd\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":59.80}
-{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´\u0011#ñ#B \" :Ã|à\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":767.31}
+{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´\u0011#ñ#B \" :æà\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":767.31}
{"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"ZÁ\u0007Ì0z \u0003\u0015eCjú \u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":873.44}
{"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"8oÂÔàÒ\u0010\u001F«Ð· \"ç\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":39.71}
{"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"¿\u001C ]Æ»ýY)Ô%@ÌÒ\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":536.19}
{"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ëù±l¥Ù Ë^o _²<\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":346.57}
-{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få©\u000BèÙªÎÓÖ|\n\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":471.30}
+{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få©\u000BèÙªÎÓÖ¦\n\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":471.30}
{"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"\u001F¶ýB}óþ\fF#«RÇÙ\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":287.83}
{"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ý ëºê6\u0016 %±Çì\u000E \u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":60112.00}
{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":" ïêÖj% ®¯\\¶ X\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":539.59}
diff --git a/examples/examples-collection/pom.xml b/examples/examples-collection/pom.xml
index 05be11891..a57a47d78 100644
--- a/examples/examples-collection/pom.xml
+++ b/examples/examples-collection/pom.xml
@@ -117,6 +117,18 @@
+
+
+ *:*
+
+ META-INF/LICENSE.txt
+ META-INF/MANIFEST.MF
+ META-INF/*.SF
+ META-INF/*.DSA
+ META-INF/*.RSA
+
+
+
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala
index 8c5447ebb..9642bbffb 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala
@@ -18,7 +18,7 @@ package com.example.spark.cobol.examples.apps
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext}
-import za.co.absa.cobrix.spark.cobol.source.parameters.CobolParametersParser._
+import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser._
object StreamingExample {
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala
index 282c94367..def94f59a 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala
@@ -19,8 +19,8 @@ package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
import scala.util.Random
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
/**
* This is a test data generator. The copybook for it is listed below.
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala
index f5827b16b..bf4ff6941 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala
@@ -17,6 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala
index 98eaa3ed4..ec1d4e5ae 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala
@@ -17,8 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala
index 32242bda0..cf28191c4 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala
@@ -18,8 +18,8 @@ package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
import java.util
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala
index 490a54eff..1cd0fccfc 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala
@@ -17,8 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala
index 6eecc5b37..4651d93f6 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala
@@ -17,6 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala
index 6c03dcc59..a013e3ff2 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala
@@ -17,8 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala
index 14e4ccee5..0c05ee394 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala
@@ -17,8 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala
index 31818cb72..7cb69b466 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala
@@ -17,8 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala
index 9ca543747..dcd44502f 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala
@@ -14,11 +14,10 @@
* limitations under the License.
*/
-package za.co.absa.cobrix.cobol.parser.examples.generators
+package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
@@ -27,7 +26,7 @@ import scala.util.Random
*/
object TestDataGen6TypeVariety {
- val numberOfRecordsToGenerate = 100
+ val numberOfRecordsToGenerate = 1000000
// seed=100 is used for the integration test
val rand: Random = new Random(/*100*/)
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala
index 50448c773..cdbe98d81 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala
@@ -17,8 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
-import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company}
-import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala
index a8b0996cf..8b66bf0a1 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala
@@ -17,6 +17,8 @@
package com.example.spark.cobol.examples.parser.generators
import java.io.{BufferedOutputStream, FileOutputStream}
+import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company}
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
import scala.util.Random
diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala
index c6b7d4fda..de3a4832a 100644
--- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala
+++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala
@@ -16,8 +16,10 @@
package com.example.spark.cobol.examples.parser.generators
-import java.io.{BufferedOutputStream, FileOutputStream}
+import com.example.spark.cobol.examples.parser.generators.model.CommonLists
+import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._
+import java.io.{BufferedOutputStream, FileOutputStream}
import scala.util.Random
/**
diff --git a/pom.xml b/pom.xml
index ecf66fb48..b67ead461 100644
--- a/pom.xml
+++ b/pom.xml
@@ -107,7 +107,7 @@
1.6
- 2.12.14
+ 2.12.15
2.12
3.1.2
3.0.1
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index fb1906884..98dde9d0a 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -17,7 +17,6 @@
import sbt._
object Dependencies {
-
private val guavaVersion = "15.0"
private val scodecBitsVersion = "1.1.4"
private val scodecCoreVersion = "1.10.3"
@@ -27,14 +26,25 @@ object Dependencies {
private val scalatestVersion = "3.0.1"
- def sparkVersion: String = sys.props.getOrElse("SPARK_VERSION", "3.1.2")
+ private val defaultSparkVersionForScala211 = "2.4.8"
+ private val defaultSparkVersionForScala212 = "3.1.2"
+
+ def sparkFallbackVersion(scalaVersion: String): String = {
+ if (scalaVersion.startsWith("2.11")) {
+ defaultSparkVersionForScala211
+ } else {
+ defaultSparkVersionForScala212
+ }
+ }
+
+ def sparkVersion(scalaVersion: String): String = sys.props.getOrElse("SPARK_VERSION", sparkFallbackVersion(scalaVersion))
def getScalaDependency(scalaVersion: String): ModuleID = "org.scala-lang" % "scala-library" % scalaVersion % Provided
- val SparkCobolDependencies: Seq[ModuleID] = Seq(
+ def SparkCobolDependencies(scalaVersion: String): Seq[ModuleID] = Seq(
// provided
- "org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
- "org.apache.spark" %% "spark-streaming" % sparkVersion % Provided,
+ "org.apache.spark" %% "spark-sql" % sparkVersion(scalaVersion) % Provided,
+ "org.apache.spark" %% "spark-streaming" % sparkVersion(scalaVersion) % Provided,
// test
"org.scalatest" %% "scalatest" % scalatestVersion % Test
diff --git a/project/build.properties b/project/build.properties
index 139325827..bb5389da2 100644
--- a/project/build.properties
+++ b/project/build.properties
@@ -1 +1 @@
-sbt.version=1.4.5
\ No newline at end of file
+sbt.version=1.5.5
\ No newline at end of file
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala
index 88d2e9feb..5f2d2457a 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala
@@ -26,7 +26,7 @@ import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmi
import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, StringTrimmingPolicy}
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat
import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat._
-import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, MultisegmentParameters, VariableLengthParameters}
+import za.co.absa.cobrix.cobol.reader.parameters.{Bdw, CobolParameters, MultisegmentParameters, VariableLengthParameters}
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy
import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy
@@ -89,6 +89,8 @@ object CobolParametersParser {
val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length"
val PARAM_RDW_ADJUSTMENT = "rdw_adjustment"
val PARAM_BDW_ADJUSTMENT = "bdw_adjustment"
+ val PARAM_BLOCK_LENGTH = "block_length"
+ val PARAM_RECORDS_PER_BLOCK = "records_per_block"
val PARAM_SEGMENT_FIELD = "segment_field"
val PARAM_SEGMENT_ID_ROOT = "segment_id_root"
val PARAM_SEGMENT_FILTER = "segment_filter"
@@ -263,7 +265,7 @@ object CobolParametersParser {
private def parseVariableLengthParameters(params: Parameters, recordFormat: RecordFormat): Option[VariableLengthParameters] = {
val recordLengthFieldOpt = params.get(PARAM_RECORD_LENGTH_FIELD)
- val isRecordSequence = Seq(VariableLength, VariableBlock, AsciiText).contains(recordFormat)
+ val isRecordSequence = Seq(FixedBlock, VariableLength, VariableBlock, AsciiText).contains(recordFormat)
val isRecordIdGenerationEnabled = params.getOrElse(PARAM_GENERATE_RECORD_ID, "false").toBoolean
val fileStartOffset = params.getOrElse(PARAM_FILE_START_OFFSET, "0").toInt
val fileEndOffset = params.getOrElse(PARAM_FILE_END_OFFSET, "0").toInt
@@ -271,7 +273,7 @@ object CobolParametersParser {
val hasRecordExtractor = params.contains(PARAM_RECORD_EXTRACTOR)
if (params.contains(PARAM_RECORD_LENGTH_FIELD) &&
- (params.contains(PARAM_IS_RECORD_SEQUENCE) || params.contains(PARAM_IS_XCOM) )) {
+ (params.contains(PARAM_IS_RECORD_SEQUENCE) || params.contains(PARAM_IS_XCOM))) {
throw new IllegalArgumentException(s"Option '$PARAM_RECORD_LENGTH_FIELD' cannot be used together with '$PARAM_IS_RECORD_SEQUENCE' or '$PARAM_IS_XCOM'.")
}
@@ -286,12 +288,10 @@ object CobolParametersParser {
Some(VariableLengthParameters
(
isRecordSequence,
- recordFormat == VariableBlock,
+ parseBdw(params, recordFormat),
params.getOrElse(PARAM_IS_RDW_BIG_ENDIAN, "false").toBoolean,
- params.getOrElse(PARAM_IS_BDW_BIG_ENDIAN, "false").toBoolean,
params.getOrElse(PARAM_IS_RDW_PART_REC_LENGTH, "false").toBoolean,
params.getOrElse(PARAM_RDW_ADJUSTMENT, "0").toInt,
- params.getOrElse(PARAM_BDW_ADJUSTMENT, "0").toInt,
params.get(PARAM_RECORD_HEADER_PARSER),
params.get(PARAM_RECORD_EXTRACTOR),
params.get(PARAM_RHP_ADDITIONAL_INFO),
@@ -314,6 +314,29 @@ object CobolParametersParser {
}
}
+ private def parseBdw(params: Parameters, recordFormat: RecordFormat): Option[Bdw] = {
+ if (recordFormat == FixedBlock || recordFormat == VariableBlock) {
+ val bdw = Bdw(
+ params.getOrElse(PARAM_IS_BDW_BIG_ENDIAN, "false").toBoolean,
+ params.getOrElse(PARAM_BDW_ADJUSTMENT, "0").toInt,
+ params.get(PARAM_BLOCK_LENGTH).map(_.toInt),
+ params.get(PARAM_RECORDS_PER_BLOCK).map(_.toInt)
+ )
+ if (bdw.blockLength.nonEmpty && bdw.recordsPerBlock.nonEmpty) {
+ throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and '$PARAM_RECORDS_PER_BLOCK' cannot be used together.")
+ }
+ if (recordFormat == VariableBlock && bdw.blockLength.nonEmpty) {
+ logger.warn(s"Option '$PARAM_BLOCK_LENGTH' is ignored for record format: VB")
+ }
+ if (recordFormat == FixedBlock && bdw.recordsPerBlock.nonEmpty) {
+ logger.warn(s"Option '$PARAM_RECORDS_PER_BLOCK' is ignored for record format: VB")
+ }
+ Some(bdw)
+ } else {
+ None
+ }
+ }
+
private def getRecordFormat(params: Parameters): RecordFormat = {
if (params.contains(PARAM_RECORD_FORMAT)) {
val recordFormatStr = params(PARAM_RECORD_FORMAT)
@@ -398,7 +421,7 @@ object CobolParametersParser {
val name = s"$PARAM_SEGMENT_ID_LEVEL_PREFIX$i"
if (params.contains(name)) {
levels += params(name)
- } else if (i==0 && params.contains(PARAM_SEGMENT_ID_ROOT)){
+ } else if (i == 0 && params.contains(PARAM_SEGMENT_ID_ROOT)) {
levels += params(PARAM_SEGMENT_ID_ROOT)
} else {
return levels
@@ -418,6 +441,7 @@ object CobolParametersParser {
}
// key - segment id, value - redefine field name
+
/**
* Parses the list of redefines and their corresponding segment ids.
*
@@ -449,7 +473,7 @@ object CobolParametersParser {
keyNoCase.startsWith("redefine_segment_id_map")) {
params.markUsed(k)
val splitVal = v.split("\\=\\>")
- if (splitVal.lengthCompare(2) !=0) {
+ if (splitVal.lengthCompare(2) != 0) {
throw new IllegalArgumentException(s"Illegal argument for the 'redefine-segment-id-map' option: '$v'.")
}
val redefine = splitVal(0).trim
@@ -691,15 +715,15 @@ object CobolParametersParser {
}
/**
- * Parses the options for the occurs mappings.
- *
- * @param params Parameters provided by spark.read.option(...)
- * @return Returns a mapping for OCCURS fields
- */
+ * Parses the options for the occurs mappings.
+ *
+ * @param params Parameters provided by spark.read.option(...)
+ * @return Returns a mapping for OCCURS fields
+ */
@throws(classOf[IllegalArgumentException])
def getOccursMappings(params: String): Map[String, Map[String, Int]] = {
val parser = new ParserJson()
val parsedParams = parser.parseMap(params)
- parsedParams.map( kv => kv._1 -> kv._2.asInstanceOf[Map[String, Any]].map(x => x._1 -> x._2.asInstanceOf[Int]))
+ parsedParams.map(kv => kv._1 -> kv._2.asInstanceOf[Map[String, Any]].map(x => x._1 -> x._2.asInstanceOf[Int]))
}
-}
\ No newline at end of file
+}
diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala
index 64fbc5e46..71dfd41c3 100644
--- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala
+++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala
@@ -142,12 +142,10 @@ class DefaultSource
val varLenParams: VariableLengthParameters = parameters.variableLengthParams
.getOrElse(
VariableLengthParameters(isRecordSequence = false,
- hasBdw = false,
+ None,
isRdwBigEndian = false,
- isBdwBigEndian = false,
isRdwPartRecLength = false,
rdwAdjustment = 0,
- bdwAdjustment = 0,
recordHeaderParser = None,
recordExtractor = None,
rhpAdditionalInfo = None,
@@ -171,7 +169,9 @@ class DefaultSource
else
None
- ReaderParameters(isEbcdic = parameters.isEbcdic,
+ ReaderParameters(
+ recordFormat = parameters.recordFormat,
+ isEbcdic = parameters.isEbcdic,
isText = parameters.isText,
ebcdicCodePage = parameters.ebcdicCodePage,
ebcdicCodePageClass = parameters.ebcdicCodePageClass,
@@ -182,12 +182,10 @@ class DefaultSource
recordLength = parameters.recordLength,
lengthFieldName = recordLengthField,
isRecordSequence = varLenParams.isRecordSequence,
- hasBdw = varLenParams.hasBdw,
+ bdw = varLenParams.bdw,
isRdwBigEndian = varLenParams.isRdwBigEndian,
- isBdwBigEndian = varLenParams.isBdwBigEndian,
isRdwPartRecLength = varLenParams.isRdwPartRecLength,
rdwAdjustment = varLenParams.rdwAdjustment,
- bdwAdjustment = varLenParams.bdwAdjustment,
isIndexGenerationNeeded = varLenParams.isUsingIndex,
inputSplitRecords = varLenParams.inputSplitRecords,
inputSplitSizeMB = varLenParams.inputSplitSizeMB,
diff --git a/spark-cobol/src/test/resources/log4j.properties b/spark-cobol/src/test/resources/log4j.properties
index d80dac076..df78b9a39 100644
--- a/spark-cobol/src/test/resources/log4j.properties
+++ b/spark-cobol/src/test/resources/log4j.properties
@@ -14,37 +14,12 @@
# limitations under the License.
#
-# Set everything to be logged to the file core/target/unit-tests.log
-log4j.rootLogger=DEBUG, CA, FA
-
-#Console Appender
-log4j.appender.CA=org.apache.log4j.ConsoleAppender
-log4j.appender.CA.layout=org.apache.log4j.PatternLayout
-log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n
-log4j.appender.CA.Threshold = WARN
-
-#File Appender
-log4j.appender.FA=org.apache.log4j.FileAppender
-log4j.appender.FA.append=false
-log4j.appender.FA.file=target/unit-tests.log
-log4j.appender.FA.layout=org.apache.log4j.PatternLayout
-log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n
-
-# Set the logger level of File Appender to WARN
-log4j.appender.FA.Threshold = INFO
-
-# Some packages are noisy for no good reason.
-log4j.additivity.parquet.hadoop.ParquetRecordReader=false
-log4j.logger.parquet.hadoop.ParquetRecordReader=OFF
-
-log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false
-log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF
-
-log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false
-log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF
-
-log4j.additivity.hive.ql.metadata.Hive=false
-log4j.logger.hive.ql.metadata.Hive=OFF
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
+log4j.appender.console.Threshold=ERROR
# Turn off noisy loggers in unit tests
log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala
index 554c772b3..0d3164b9b 100644
--- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala
@@ -26,7 +26,7 @@ import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
//noinspection NameBooleanParameters
class Test29BdwFileSpec extends WordSpec with SparkTestBase with BinaryFileFixture {
- private val exampleName = "Test29 (VB record format RDW+BDW"
+ private val exampleName = "Test29 (VB record format RDW+BDW)"
private val copybook =
""" 01 R.
diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala
new file mode 100644
index 000000000..1ab6f0b49
--- /dev/null
+++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala
@@ -0,0 +1,142 @@
+/*
+ * Copyright 2018 ABSA Group Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package za.co.absa.cobrix.spark.cobol.source.integration
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.functions.col
+import org.scalatest.WordSpec
+import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase
+import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture
+
+//noinspection NameBooleanParameters
+class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixture {
+
+ private val exampleName = "Test30 (FB record format)"
+
+ private val copybook =
+ """ 01 R.
+ 03 A PIC X(2).
+ """
+
+ val expected2Records = """[{"A":"00"},{"A":"11"}]"""
+ val expected4Records = """[{"A":"00"},{"A":"11"},{"A":"22"},{"A":"33"}]"""
+
+ "FB record loader" should {
+ "load data when record length and block length are specified" in {
+ testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "4", "record_length" -> "2"), expected2Records)
+ }
+
+ "load data when block length is specified" in {
+ testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "2"), expected2Records)
+ }
+
+ "load data when the number records per block is specified" in {
+ testVbRecordLoad(2, 2, Map[String, String]("records_per_block" -> "2"), expected4Records)
+ }
+
+ "load data when smaller records are provided" in {
+ testVbRecordLoad(1, 2,
+ Map[String, String]("block_length" -> "4", "record_length" -> "1"),
+ """[{"A":"0"},{"A":"0"},{"A":"1"},{"A":"1"}]""",
+ ignoreCount = true)
+ }
+
+ "load data with BDWs" in {
+ testVbRecordLoad(2, 2,
+ Map[String, String](),
+ expected4Records,
+ hasBDW = true)
+ }
+
+ "load empty data with BDWs" in {
+ testVbRecordLoad(0, 0,
+ Map[String, String](),
+ "[]",
+ hasBDW = true)
+ }
+ }
+
+ "FB record failures should happen" when {
+ "Block length is negative" in {
+ val ex = intercept[SparkException] {
+ testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "-1"), "")
+ }
+
+ assert(ex.getCause.getMessage.contains("Block length should be positive. Got -1."))
+ }
+
+ "Records per block is negative" in {
+ val ex = intercept[SparkException] {
+ testVbRecordLoad(1, 2, Map[String, String]("records_per_block" -> "-1"), "")
+ }
+
+ assert(ex.getCause.getMessage.contains("Records per block should be positive. Got -1."))
+ }
+
+ "Both block length and records per block are specified" in {
+ val ex = intercept[IllegalArgumentException] {
+ testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "2", "records_per_block" -> "2"), "")
+ }
+
+ assert(ex.getMessage.contains("Options 'block_length' and 'records_per_block' cannot be used together."))
+ }
+ }
+
+ private def testVbRecordLoad(blocks: Int,
+ records: Int,
+ options: Map[String, String],
+ expected: String,
+ ignoreCount: Boolean = false,
+ hasBDW: Boolean = false): Unit = {
+ val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => {
+ val bdw: Seq[Byte] = if (hasBDW) {
+ val byte0 = ((records * 2) % 256).toByte
+ val byte1 = ((records * 2) / 256).toByte
+ Seq(0, 0, byte0, byte1)
+ } else {
+ Nil
+ }
+ bdw ++ Range(0, records).flatMap(recordNum => {
+ val idx = (blockNum * records + recordNum) % 10
+ val v = (0xF0 + idx).toByte
+ Seq(v, v)
+ })
+ })
+
+ withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 =>
+ val df = spark
+ .read
+ .format("cobol")
+ .option("copybook_contents", copybook)
+ .option("record_format", "FB")
+ .options(options)
+ .load(tmpFileName1)
+
+ val actual = df
+ .orderBy(col("A"))
+ .toJSON
+ .collect()
+ .mkString("[", ",", "]")
+
+ if (!ignoreCount) {
+ assert(df.count() == blocks * records)
+ }
+ assert(actual == expected)
+ }
+ }
+
+}