From e3dd5fd03c4a4836d62e66167ab89d8dbf24dba9 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 14 Sep 2021 14:35:34 +0200 Subject: [PATCH 01/13] Fix compilation of the examples collection app --- examples/examples-collection/pom.xml | 12 ++++++ .../examples/apps/StreamingExample.scala | 2 +- .../cobol/examples/parser/ReaderExample.scala | 37 ------------------- .../generators/TestDataGen11CustomRDW.scala | 4 +- .../TestDataGen13aFileHeaderAndFooter.scala | 2 + .../TestDataGen13bCompaniesFileHeaders.scala | 4 +- .../TestDataGen16MultisegFixedLen.scala | 4 +- .../TestDataGen17Hierarchical.scala | 4 +- .../generators/TestDataGen1Transactions.scala | 2 + .../generators/TestDataGen3Companies.scala | 4 +- .../TestDataGen3CompaniesBigEndian.scala | 4 +- .../TestDataGen4CompaniesWide.scala | 4 +- .../generators/TestDataGen6TypeVariety.scala | 7 ++-- .../generators/TestDataGen7Fillers.scala | 4 +- .../TestDataGen8NonPrintableNames.scala | 2 + .../generators/TestDataGen9CodePages.scala | 4 +- .../parameters/CobolParametersParser.scala | 2 + 17 files changed, 43 insertions(+), 59 deletions(-) delete mode 100644 examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala diff --git a/examples/examples-collection/pom.xml b/examples/examples-collection/pom.xml index 05be11891..a57a47d78 100644 --- a/examples/examples-collection/pom.xml +++ b/examples/examples-collection/pom.xml @@ -117,6 +117,18 @@ + + + *:* + + META-INF/LICENSE.txt + META-INF/MANIFEST.MF + META-INF/*.SF + META-INF/*.DSA + META-INF/*.RSA + + + diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala index 8c5447ebb..9642bbffb 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/apps/StreamingExample.scala @@ -18,7 +18,7 @@ package com.example.spark.cobol.examples.apps import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext} -import za.co.absa.cobrix.spark.cobol.source.parameters.CobolParametersParser._ +import za.co.absa.cobrix.spark.cobol.parameters.CobolParametersParser._ object StreamingExample { diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala deleted file mode 100644 index 56d61a51d..000000000 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/ReaderExample.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright 2018 ABSA Group Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.example.spark.cobol.examples.parser - -import za.co.absa.cobrix.cobol.parser.reader.FSReader - -object ReaderExample { - - def main(args: Array[String]): Unit = { - - val reader = new FSReader("../../examples/example_copybook.cob", "../../examples/example_data/file1.bin") - - val it = reader.getIterator - - println(it.hasNext) - - while (it.hasNext) { - println(it.next()) - } - - } - -} diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala index 282c94367..def94f59a 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen11CustomRDW.scala @@ -19,8 +19,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} import scala.util.Random -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ /** * This is a test data generator. The copybook for it is listed below. diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala index f5827b16b..bf4ff6941 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13aFileHeaderAndFooter.scala @@ -17,6 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala index 98eaa3ed4..ec1d4e5ae 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen13bCompaniesFileHeaders.scala @@ -17,8 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala index 32242bda0..cf28191c4 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen16MultisegFixedLen.scala @@ -18,8 +18,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} import java.util -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala index 490a54eff..1cd0fccfc 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen17Hierarchical.scala @@ -17,8 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala index 6eecc5b37..4651d93f6 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen1Transactions.scala @@ -17,6 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala index 6c03dcc59..a013e3ff2 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3Companies.scala @@ -17,8 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala index 14e4ccee5..0c05ee394 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen3CompaniesBigEndian.scala @@ -17,8 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala index 31818cb72..7cb69b466 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen4CompaniesWide.scala @@ -17,8 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala index 9ca543747..dcd44502f 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen6TypeVariety.scala @@ -14,11 +14,10 @@ * limitations under the License. */ -package za.co.absa.cobrix.cobol.parser.examples.generators +package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random @@ -27,7 +26,7 @@ import scala.util.Random */ object TestDataGen6TypeVariety { - val numberOfRecordsToGenerate = 100 + val numberOfRecordsToGenerate = 1000000 // seed=100 is used for the integration test val rand: Random = new Random(/*100*/) diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala index 50448c773..cdbe98d81 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen7Fillers.scala @@ -17,8 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} -import za.co.absa.cobrix.cobol.parser.examples.generators.model.{CommonLists, Company} -import za.co.absa.cobrix.cobol.parser.examples.generators.utils.GeneratorTools._ +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala index a8b0996cf..8b66bf0a1 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen8NonPrintableNames.scala @@ -17,6 +17,8 @@ package com.example.spark.cobol.examples.parser.generators import java.io.{BufferedOutputStream, FileOutputStream} +import com.example.spark.cobol.examples.parser.generators.model.{CommonLists, Company} +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ import scala.util.Random diff --git a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala index c6b7d4fda..de3a4832a 100644 --- a/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala +++ b/examples/examples-collection/src/main/scala/com/example/spark/cobol/examples/parser/generators/TestDataGen9CodePages.scala @@ -16,8 +16,10 @@ package com.example.spark.cobol.examples.parser.generators -import java.io.{BufferedOutputStream, FileOutputStream} +import com.example.spark.cobol.examples.parser.generators.model.CommonLists +import com.example.spark.cobol.examples.parser.generators.utils.GeneratorTools._ +import java.io.{BufferedOutputStream, FileOutputStream} import scala.util.Random /** diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index 88d2e9feb..ebd512e98 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -89,6 +89,8 @@ object CobolParametersParser { val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" val PARAM_RDW_ADJUSTMENT = "rdw_adjustment" val PARAM_BDW_ADJUSTMENT = "bdw_adjustment" + val PARAM_BLOCK_LENGTH = "block_length" + val PARAM_RECORDS_PER_BLOCK = "records_per_block" val PARAM_SEGMENT_FIELD = "segment_field" val PARAM_SEGMENT_ID_ROOT = "segment_id_root" val PARAM_SEGMENT_FILTER = "segment_filter" From 20586db89981c239f3225be2579acb10c3217aaa Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 14 Sep 2021 16:16:04 +0200 Subject: [PATCH 02/13] Extract BDW options into a separate class. --- .../parser/recordformats/RecordFormat.scala | 2 + .../cobol/reader/VarLenNestedReader.scala | 13 +- .../cobrix/cobol/reader/parameters/Bdw.scala | 8 + .../reader/parameters/ReaderParameters.scala | 13 +- .../parameters/VariableLengthParameters.scala | 8 +- .../parameters/CobolParametersParser.scala | 169 ++++++++++-------- .../spark/cobol/source/DefaultSource.scala | 12 +- 7 files changed, 130 insertions(+), 95 deletions(-) create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala index 75b588f5e..37349ccfb 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/recordformats/RecordFormat.scala @@ -20,6 +20,7 @@ sealed trait RecordFormat object RecordFormat { case object FixedLength extends RecordFormat + case object FixedBlock extends RecordFormat case object VariableLength extends RecordFormat case object VariableBlock extends RecordFormat case object AsciiText extends RecordFormat @@ -27,6 +28,7 @@ object RecordFormat { def withNameOpt(s: String): Option[RecordFormat] = { s match { case "F" => Some(FixedLength) + case "FB" => Some(FixedBlock) case "V" => Some(VariableLength) case "VB" => Some(VariableBlock) case "D" => Some(AsciiText) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 0a4fb572d..944560915 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -22,6 +22,7 @@ import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory} +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock} import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor} import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler @@ -63,19 +64,23 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], copybook: Copybook ): Option[RawRecordExtractor] = { val rdwParams = RecordHeaderParameters(readerProperties.isRdwBigEndian, readerProperties.rdwAdjustment) - val bdwParams = RecordHeaderParameters(readerProperties.isBdwBigEndian, readerProperties.bdwAdjustment) val rdwDecoder = new RecordHeaderDecoderRdw(rdwParams) - val bdwDecoder = new RecordHeaderDecoderBdw(bdwParams) - val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoder, readerProperties.reAdditionalInfo) + val bdwOpt = readerProperties.bdw + val bdwParamsOpt = bdwOpt.map(bdw => RecordHeaderParameters(bdw.isBigEndian, bdw.adjustment)) + val bdwDecoderOpt = bdwParamsOpt.map(bdwParams => new RecordHeaderDecoderBdw(bdwParams)) + + val reParams = RawRecordContext(startingRecordNumber, binaryData, copybook, rdwDecoder, bdwDecoderOpt.getOrElse(rdwDecoder), readerProperties.reAdditionalInfo) readerProperties.recordExtractor match { case Some(recordExtractorClass) => Some(RawRecordExtractorFactory.createRecordHeaderParser(recordExtractorClass, reParams)) case None if readerProperties.isText => Some(new TextRecordExtractor(reParams)) - case None if readerProperties.hasBdw => + case None if readerProperties.recordFormat == FixedBlock => + Some(new VariableBlockVariableRecordExtractor(reParams)) // ToDo FB record format + case None if readerProperties.recordFormat == VariableBlock => Some(new VariableBlockVariableRecordExtractor(reParams)) case None if readerProperties.variableSizeOccurs && readerProperties.recordHeaderParser.isEmpty && diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala new file mode 100644 index 000000000..96120355c --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala @@ -0,0 +1,8 @@ +package za.co.absa.cobrix.cobol.reader.parameters + +case class Bdw( + isBigEndian: Boolean, + adjustment: Int, + blockLength: Option[Int], + recordsPerBlock: Option[Int], + ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index 6e360a686..8906c32a4 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -21,12 +21,14 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPoint import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmingPolicy import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, StringTrimmingPolicy} +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat +import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.FixedLength import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy /** * These are properties for customizing mainframe binary data reader. - * + * @param recordFormat Record format * @param isEbcdic If true the input data file encoding is EBCDIC, otherwise it is ASCII * @param isText If true line ending characters will be used (LF / CRLF) as the record separator * @param ebcdicCodePage Specifies what code page to use for EBCDIC to ASCII/Unicode conversions @@ -38,7 +40,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param recordLength Specifies the length of the record disregarding the copybook record size. Implied the file has fixed record length. * @param lengthFieldName A name of a field that contains record length. Optional. If not set the copybook record length will be used. * @param isRecordSequence Does input files have 4 byte record length headers - * @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method + * @param bdw Block descriptor word (if specified), for FB and VB record formats * @param isRdwPartRecLength Does RDW count itself as part of record length itself * @param rdwAdjustment Controls a mismatch between RDW and record length * @param isIndexGenerationNeeded Is indexing input file before processing is requested @@ -64,6 +66,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param inputFileNameColumn A column name to add to the dataframe. The column will contain input file name for each record similar to 'input_file_name()' function */ case class ReaderParameters( + recordFormat: RecordFormat = FixedLength, isEbcdic: Boolean = true, isText: Boolean = false, ebcdicCodePage: String = "common", @@ -72,15 +75,13 @@ case class ReaderParameters( isUtf16BigEndian: Boolean = true, floatingPointFormat: FloatingPointFormat = FloatingPointFormat.IBM, variableSizeOccurs: Boolean = false, - recordLength: Option[Int] = None , + recordLength: Option[Int] = None, lengthFieldName: Option[String] = None, isRecordSequence: Boolean = false, - hasBdw: Boolean = false, + bdw: Option[Bdw] = None, isRdwBigEndian: Boolean = false, - isBdwBigEndian: Boolean = false, isRdwPartRecLength: Boolean = false, rdwAdjustment: Int = 0, - bdwAdjustment: Int = 0, isIndexGenerationNeeded: Boolean = false, inputSplitRecords: Option[Int] = None, inputSplitSizeMB: Option[Int] = None, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala index def950152..12990fbd3 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/VariableLengthParameters.scala @@ -20,12 +20,10 @@ package za.co.absa.cobrix.cobol.reader.parameters * This class holds the parameters currently used for parsing variable-length records. * * @param isRecordSequence Does input files have 4 byte record length headers - * @param hasBdw Does the file have BDW headers + * @param bdw Block descriptor word (if specified), for FB and VB record formats * @param isRdwBigEndian Is RDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method - * @param isBdwBigEndian Is BDW big endian? It may depend on flavor of mainframe and/or mainframe to PC transfer method * @param isRdwPartRecLength Does RDW count itself as part of record length itself * @param rdwAdjustment Controls a mismatch between RDW and record length - * @param bdwAdjustment Controls a mismatch between BDW and record length * @param recordHeaderParser An optional custom record header parser for non-standard RDWs * @param recordExtractor An optional custom raw record parser class non-standard record types * @param rhpAdditionalInfo An optional additional option string passed to a custom record header parser @@ -44,12 +42,10 @@ package za.co.absa.cobrix.cobol.reader.parameters */ case class VariableLengthParameters( isRecordSequence: Boolean, // [deprecated by recordFormat] - hasBdw: Boolean, // [deprecated by recordFormat] + bdw: Option[Bdw], isRdwBigEndian: Boolean, - isBdwBigEndian: Boolean, isRdwPartRecLength: Boolean, rdwAdjustment: Int, - bdwAdjustment: Int, recordHeaderParser: Option[String], recordExtractor: Option[String], rhpAdditionalInfo: Option[String], diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index ebd512e98..ed0576fe2 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -26,7 +26,7 @@ import za.co.absa.cobrix.cobol.parser.policies.StringTrimmingPolicy.StringTrimmi import za.co.absa.cobrix.cobol.parser.policies.{CommentPolicy, DebugFieldsPolicy, StringTrimmingPolicy} import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat._ -import za.co.absa.cobrix.cobol.reader.parameters.{CobolParameters, MultisegmentParameters, VariableLengthParameters} +import za.co.absa.cobrix.cobol.reader.parameters.{Bdw, CobolParameters, MultisegmentParameters, VariableLengthParameters} import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy @@ -39,78 +39,78 @@ import scala.collection.mutable.ListBuffer object CobolParametersParser { private val logger = LoggerFactory.getLogger(this.getClass) - val SHORT_NAME = "cobol" - val PARAM_COPYBOOK_PATH = "copybook" - val PARAM_MULTI_COPYBOOK_PATH = "copybooks" - val PARAM_COPYBOOK_CONTENTS = "copybook_contents" - val PARAM_SOURCE_PATH = "path" - val PARAM_SOURCE_PATHS = "paths" - val PARAM_ENCODING = "encoding" - val PARAM_PEDANTIC = "pedantic" - val PARAM_RECORD_LENGTH_FIELD = "record_length_field" - val PARAM_RECORD_START_OFFSET = "record_start_offset" - val PARAM_RECORD_END_OFFSET = "record_end_offset" - val PARAM_FILE_START_OFFSET = "file_start_offset" - val PARAM_FILE_END_OFFSET = "file_end_offset" + val SHORT_NAME = "cobol" + val PARAM_COPYBOOK_PATH = "copybook" + val PARAM_MULTI_COPYBOOK_PATH = "copybooks" + val PARAM_COPYBOOK_CONTENTS = "copybook_contents" + val PARAM_SOURCE_PATH = "path" + val PARAM_SOURCE_PATHS = "paths" + val PARAM_ENCODING = "encoding" + val PARAM_PEDANTIC = "pedantic" + val PARAM_RECORD_LENGTH_FIELD = "record_length_field" + val PARAM_RECORD_START_OFFSET = "record_start_offset" + val PARAM_RECORD_END_OFFSET = "record_end_offset" + val PARAM_FILE_START_OFFSET = "file_start_offset" + val PARAM_FILE_END_OFFSET = "file_end_offset" // Schema transformation parameters - val PARAM_GENERATE_RECORD_ID = "generate_record_id" - val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" - val PARAM_GROUP_FILLERS = "drop_group_fillers" - val PARAM_VALUE_FILLERS = "drop_value_fillers" + val PARAM_GENERATE_RECORD_ID = "generate_record_id" + val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" + val PARAM_GROUP_FILLERS = "drop_group_fillers" + val PARAM_VALUE_FILLERS = "drop_value_fillers" - val PARAM_GROUP_NOT_TERMINALS = "non_terminals" - val PARAM_OCCURS_MAPPINGS = "occurs_mappings" - val PARAM_DEBUG = "debug" + val PARAM_GROUP_NOT_TERMINALS = "non_terminals" + val PARAM_OCCURS_MAPPINGS = "occurs_mappings" + val PARAM_DEBUG = "debug" // General parsing parameters - val PARAM_TRUNCATE_COMMENTS = "truncate_comments" - val PARAM_COMMENTS_LBOUND = "comments_lbound" - val PARAM_COMMENTS_UBOUND = "comments_ubound" + val PARAM_TRUNCATE_COMMENTS = "truncate_comments" + val PARAM_COMMENTS_LBOUND = "comments_lbound" + val PARAM_COMMENTS_UBOUND = "comments_ubound" // Data parsing parameters - val PARAM_STRING_TRIMMING_POLICY = "string_trimming_policy" - val PARAM_EBCDIC_CODE_PAGE = "ebcdic_code_page" - val PARAM_EBCDIC_CODE_PAGE_CLASS = "ebcdic_code_page_class" - val PARAM_ASCII_CHARSET = "ascii_charset" - val PARAM_IS_UTF16_BIG_ENDIAN = "is_utf16_big_endian" - val PARAM_FLOATING_POINT_FORMAT = "floating_point_format" - val PARAM_VARIABLE_SIZE_OCCURS = "variable_size_occurs" - val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection" + val PARAM_STRING_TRIMMING_POLICY = "string_trimming_policy" + val PARAM_EBCDIC_CODE_PAGE = "ebcdic_code_page" + val PARAM_EBCDIC_CODE_PAGE_CLASS = "ebcdic_code_page_class" + val PARAM_ASCII_CHARSET = "ascii_charset" + val PARAM_IS_UTF16_BIG_ENDIAN = "is_utf16_big_endian" + val PARAM_FLOATING_POINT_FORMAT = "floating_point_format" + val PARAM_VARIABLE_SIZE_OCCURS = "variable_size_occurs" + val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection" // Parameters for multisegment variable length files - val PARAM_RECORD_FORMAT = "record_format" - val PARAM_RECORD_LENGTH = "record_length" - val PARAM_IS_XCOM = "is_xcom" - val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" - val PARAM_IS_TEXT = "is_text" - val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian" - val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian" - val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" - val PARAM_RDW_ADJUSTMENT = "rdw_adjustment" - val PARAM_BDW_ADJUSTMENT = "bdw_adjustment" - val PARAM_BLOCK_LENGTH = "block_length" - val PARAM_RECORDS_PER_BLOCK = "records_per_block" - val PARAM_SEGMENT_FIELD = "segment_field" - val PARAM_SEGMENT_ID_ROOT = "segment_id_root" - val PARAM_SEGMENT_FILTER = "segment_filter" - val PARAM_SEGMENT_ID_LEVEL_PREFIX = "segment_id_level" - val PARAM_RECORD_HEADER_PARSER = "record_header_parser" - val PARAM_RECORD_EXTRACTOR = "record_extractor" - val PARAM_RHP_ADDITIONAL_INFO = "rhp_additional_info" - val PARAM_RE_ADDITIONAL_INFO = "re_additional_info" - val PARAM_INPUT_FILE_COLUMN = "with_input_file_name_col" + val PARAM_RECORD_FORMAT = "record_format" + val PARAM_RECORD_LENGTH = "record_length" + val PARAM_IS_XCOM = "is_xcom" + val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" + val PARAM_IS_TEXT = "is_text" + val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian" + val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian" + val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" + val PARAM_RDW_ADJUSTMENT = "rdw_adjustment" + val PARAM_BDW_ADJUSTMENT = "bdw_adjustment" + val PARAM_BLOCK_LENGTH = "block_length" + val PARAM_RECORDS_PER_BLOCK = "records_per_block" + val PARAM_SEGMENT_FIELD = "segment_field" + val PARAM_SEGMENT_ID_ROOT = "segment_id_root" + val PARAM_SEGMENT_FILTER = "segment_filter" + val PARAM_SEGMENT_ID_LEVEL_PREFIX = "segment_id_level" + val PARAM_RECORD_HEADER_PARSER = "record_header_parser" + val PARAM_RECORD_EXTRACTOR = "record_extractor" + val PARAM_RHP_ADDITIONAL_INFO = "rhp_additional_info" + val PARAM_RE_ADDITIONAL_INFO = "re_additional_info" + val PARAM_INPUT_FILE_COLUMN = "with_input_file_name_col" // Indexed multisegment file processing - val PARAM_ENABLE_INDEXES = "enable_indexes" - val PARAM_INPUT_SPLIT_RECORDS = "input_split_records" - val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb" - val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix" - val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation" - val PARAM_IMPROVE_LOCALITY = "improve_locality" + val PARAM_ENABLE_INDEXES = "enable_indexes" + val PARAM_INPUT_SPLIT_RECORDS = "input_split_records" + val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb" + val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix" + val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation" + val PARAM_IMPROVE_LOCALITY = "improve_locality" // Parameters for debugging - val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size" + val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size" private def getSchemaRetentionPolicy(params: Parameters): SchemaRetentionPolicy = { val schemaRetentionPolicyName = params.getOrElse(PARAM_SCHEMA_RETENTION_POLICY, "collapse_root") @@ -273,7 +273,7 @@ object CobolParametersParser { val hasRecordExtractor = params.contains(PARAM_RECORD_EXTRACTOR) if (params.contains(PARAM_RECORD_LENGTH_FIELD) && - (params.contains(PARAM_IS_RECORD_SEQUENCE) || params.contains(PARAM_IS_XCOM) )) { + (params.contains(PARAM_IS_RECORD_SEQUENCE) || params.contains(PARAM_IS_XCOM))) { throw new IllegalArgumentException(s"Option '$PARAM_RECORD_LENGTH_FIELD' cannot be used together with '$PARAM_IS_RECORD_SEQUENCE' or '$PARAM_IS_XCOM'.") } @@ -288,12 +288,10 @@ object CobolParametersParser { Some(VariableLengthParameters ( isRecordSequence, - recordFormat == VariableBlock, + parseBdw(params, recordFormat), params.getOrElse(PARAM_IS_RDW_BIG_ENDIAN, "false").toBoolean, - params.getOrElse(PARAM_IS_BDW_BIG_ENDIAN, "false").toBoolean, params.getOrElse(PARAM_IS_RDW_PART_REC_LENGTH, "false").toBoolean, params.getOrElse(PARAM_RDW_ADJUSTMENT, "0").toInt, - params.getOrElse(PARAM_BDW_ADJUSTMENT, "0").toInt, params.get(PARAM_RECORD_HEADER_PARSER), params.get(PARAM_RECORD_EXTRACTOR), params.get(PARAM_RHP_ADDITIONAL_INFO), @@ -316,6 +314,32 @@ object CobolParametersParser { } } + private def parseBdw(params: Parameters, recordFormat: RecordFormat): Option[Bdw] = { + if (recordFormat == FixedBlock || recordFormat == VariableBlock) { + val bdw = Bdw( + params.getOrElse(PARAM_IS_BDW_BIG_ENDIAN, "false").toBoolean, + params.getOrElse(PARAM_BDW_ADJUSTMENT, "0").toInt, + params.get(PARAM_BLOCK_LENGTH).map(_.toInt), + params.get(PARAM_RECORDS_PER_BLOCK).map(_.toInt), + ) + if (bdw.blockLength.nonEmpty && bdw.recordsPerBlock.nonEmpty) { + throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and $PARAM_RECORDS_PER_BLOCK cannot be used together.") + } + if (recordFormat == FixedBlock && bdw.blockLength.isEmpty && bdw.recordsPerBlock.isEmpty ) { + throw new IllegalArgumentException(s"For FB file format either '$PARAM_BLOCK_LENGTH' or $PARAM_RECORDS_PER_BLOCK must be specified.") + } + if (recordFormat == VariableBlock && bdw.blockLength.nonEmpty) { + logger.warn(s"Option '$PARAM_BLOCK_LENGTH' is ignored for record format: VB") + } + if (recordFormat == FixedBlock && bdw.recordsPerBlock.nonEmpty) { + logger.warn(s"Option '$PARAM_RECORDS_PER_BLOCK' is ignored for record format: VB") + } + Some(bdw) + } else { + None + } + } + private def getRecordFormat(params: Parameters): RecordFormat = { if (params.contains(PARAM_RECORD_FORMAT)) { val recordFormatStr = params(PARAM_RECORD_FORMAT) @@ -400,7 +424,7 @@ object CobolParametersParser { val name = s"$PARAM_SEGMENT_ID_LEVEL_PREFIX$i" if (params.contains(name)) { levels += params(name) - } else if (i==0 && params.contains(PARAM_SEGMENT_ID_ROOT)){ + } else if (i == 0 && params.contains(PARAM_SEGMENT_ID_ROOT)) { levels += params(PARAM_SEGMENT_ID_ROOT) } else { return levels @@ -420,6 +444,7 @@ object CobolParametersParser { } // key - segment id, value - redefine field name + /** * Parses the list of redefines and their corresponding segment ids. * @@ -451,7 +476,7 @@ object CobolParametersParser { keyNoCase.startsWith("redefine_segment_id_map")) { params.markUsed(k) val splitVal = v.split("\\=\\>") - if (splitVal.lengthCompare(2) !=0) { + if (splitVal.lengthCompare(2) != 0) { throw new IllegalArgumentException(s"Illegal argument for the 'redefine-segment-id-map' option: '$v'.") } val redefine = splitVal(0).trim @@ -693,15 +718,15 @@ object CobolParametersParser { } /** - * Parses the options for the occurs mappings. - * - * @param params Parameters provided by spark.read.option(...) - * @return Returns a mapping for OCCURS fields - */ + * Parses the options for the occurs mappings. + * + * @param params Parameters provided by spark.read.option(...) + * @return Returns a mapping for OCCURS fields + */ @throws(classOf[IllegalArgumentException]) def getOccursMappings(params: String): Map[String, Map[String, Int]] = { val parser = new ParserJson() val parsedParams = parser.parseMap(params) - parsedParams.map( kv => kv._1 -> kv._2.asInstanceOf[Map[String, Any]].map(x => x._1 -> x._2.asInstanceOf[Int])) + parsedParams.map(kv => kv._1 -> kv._2.asInstanceOf[Map[String, Any]].map(x => x._1 -> x._2.asInstanceOf[Int])) } } \ No newline at end of file diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index 64fbc5e46..71dfd41c3 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -142,12 +142,10 @@ class DefaultSource val varLenParams: VariableLengthParameters = parameters.variableLengthParams .getOrElse( VariableLengthParameters(isRecordSequence = false, - hasBdw = false, + None, isRdwBigEndian = false, - isBdwBigEndian = false, isRdwPartRecLength = false, rdwAdjustment = 0, - bdwAdjustment = 0, recordHeaderParser = None, recordExtractor = None, rhpAdditionalInfo = None, @@ -171,7 +169,9 @@ class DefaultSource else None - ReaderParameters(isEbcdic = parameters.isEbcdic, + ReaderParameters( + recordFormat = parameters.recordFormat, + isEbcdic = parameters.isEbcdic, isText = parameters.isText, ebcdicCodePage = parameters.ebcdicCodePage, ebcdicCodePageClass = parameters.ebcdicCodePageClass, @@ -182,12 +182,10 @@ class DefaultSource recordLength = parameters.recordLength, lengthFieldName = recordLengthField, isRecordSequence = varLenParams.isRecordSequence, - hasBdw = varLenParams.hasBdw, + bdw = varLenParams.bdw, isRdwBigEndian = varLenParams.isRdwBigEndian, - isBdwBigEndian = varLenParams.isBdwBigEndian, isRdwPartRecLength = varLenParams.isRdwPartRecLength, rdwAdjustment = varLenParams.rdwAdjustment, - bdwAdjustment = varLenParams.bdwAdjustment, isIndexGenerationNeeded = varLenParams.isUsingIndex, inputSplitRecords = varLenParams.inputSplitRecords, inputSplitSizeMB = varLenParams.inputSplitSizeMB, From 63f06b19eb243a92ace98bd13ee5afa522c6db8a Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 16 Sep 2021 13:59:26 +0200 Subject: [PATCH 03/13] #420 Add implementation of raw record extractor for FB format. --- .../cobol/reader/VarLenNestedReader.scala | 6 +- .../extractors/raw/FixedBlockParameters.scala | 21 ++ .../raw/FixedBlockRawRecordExtractor.scala | 63 ++++++ .../FixedBlockRawRecordExtractorSuite.scala | 180 ++++++++++++++++++ 4 files changed, 268 insertions(+), 2 deletions(-) create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala create mode 100644 cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala index 944560915..7ec5a61de 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/VarLenNestedReader.scala @@ -24,7 +24,7 @@ import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.headerparsers.{RecordHeaderParser, RecordHeaderParserFactory} import za.co.absa.cobrix.cobol.parser.recordformats.RecordFormat.{FixedBlock, VariableBlock} import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} -import za.co.absa.cobrix.cobol.reader.extractors.raw.{RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor} +import za.co.absa.cobrix.cobol.reader.extractors.raw.{FixedBlockParameters, FixedBlockRawRecordExtractor, RawRecordContext, RawRecordExtractor, RawRecordExtractorFactory, TextRecordExtractor, VarOccursRecordExtractor, VariableBlockVariableRecordExtractor} import za.co.absa.cobrix.cobol.reader.extractors.record.RecordHandler import za.co.absa.cobrix.cobol.reader.index.IndexGenerator import za.co.absa.cobrix.cobol.reader.index.entry.SparseIndexEntry @@ -79,7 +79,9 @@ class VarLenNestedReader[T: ClassTag](copybookContents: Seq[String], case None if readerProperties.isText => Some(new TextRecordExtractor(reParams)) case None if readerProperties.recordFormat == FixedBlock => - Some(new VariableBlockVariableRecordExtractor(reParams)) // ToDo FB record format + val fbParams = FixedBlockParameters(readerProperties.recordLength, bdwOpt.get.blockLength, bdwOpt.get.recordsPerBlock) + FixedBlockParameters.validate(fbParams) + Some(new FixedBlockRawRecordExtractor(reParams, fbParams)) case None if readerProperties.recordFormat == VariableBlock => Some(new VariableBlockVariableRecordExtractor(reParams)) case None if readerProperties.variableSizeOccurs && diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala new file mode 100644 index 000000000..b348267e0 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala @@ -0,0 +1,21 @@ +package za.co.absa.cobrix.cobol.reader.extractors.raw + +case class FixedBlockParameters( + recordLength: Option[Int], + blockLength: Option[Int], + recordsPerBlock: Option[Int] + ) + +object FixedBlockParameters { + def validate(params: FixedBlockParameters): Unit = { + if (params.blockLength.isEmpty && params.recordsPerBlock.isEmpty) { + throw new IllegalArgumentException("FB record format requires block length or number records per block to be specified.") + } + if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) { + throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.") + } + params.recordLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Record length should be positive. Got $x.")) + params.blockLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Block length should be positive. Got $x.")) + params.recordsPerBlock.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Records per block should be positive. Got $x.")) + } +} \ No newline at end of file diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala new file mode 100644 index 000000000..fa7c43f6e --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala @@ -0,0 +1,63 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import scala.collection.mutable + +class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockParameters) extends Serializable with RawRecordExtractor { + private val recordQueue = new mutable.Queue[Array[Byte]] + + private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize) + private val bdwSize = fbParams.blockLength.getOrElse(fbParams.recordsPerBlock.get * recordSize) + + override def offset: Long = ctx.inputStream.offset + + override def hasNext: Boolean = { + if (recordQueue.isEmpty) { + readNextBlock() + } + recordQueue.nonEmpty + } + + private def readNextBlock(): Unit = { + if (!ctx.inputStream.isEndOfStream) { + val bdwOffset = ctx.inputStream.offset + val blockBuffer = ctx.inputStream.next(bdwSize) + + var blockIndex = 0 + + while (blockIndex < blockBuffer.length) { + val rdwOffset = bdwOffset + blockIndex + + val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize) + if (payload.length > 0) { + recordQueue.enqueue(payload) + } + blockIndex += recordSize + } + } + } + + + @throws[NoSuchElementException] + override def next(): Array[Byte] = { + if (!hasNext) { + throw new NoSuchElementException + } + recordQueue.dequeue() + } +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala new file mode 100644 index 000000000..a68694dd3 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala @@ -0,0 +1,180 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.extractors.raw + +import org.scalatest.WordSpec +import za.co.absa.cobrix.cobol.parser.CopybookParser +import za.co.absa.cobrix.cobol.reader.memorystream.TestByteStream +import za.co.absa.cobrix.cobol.reader.recordheader.{RecordHeaderDecoderBdw, RecordHeaderDecoderRdw, RecordHeaderParametersFactory} + +class FixedBlockRawRecordExtractorSuite extends WordSpec { + private val copybookContent = + """ 01 RECORD. + 02 X PIC X(2). + """ + private val copybook = CopybookParser.parseTree(copybookContent) + + private val fbParams = FixedBlockParameters(None, Some(2), None) + + "fixed block fixed length records" should { + "be able to read a FB file that has no data" in { + val rc = getRawRecordContext(Array[Byte]()) + + val extractor = new FixedBlockRawRecordExtractor(rc, fbParams) + + assert(!extractor.hasNext) + + intercept[NoSuchElementException] { + extractor.next() + } + } + + "be able to read a FB file that has an incomplete record" in { + val rc = getRawRecordContext(Array[Byte](0xF0.toByte)) + + val extractor = new FixedBlockRawRecordExtractor(rc, fbParams) + + assert(extractor.hasNext) + + val r0 = extractor.next() + + assert(r0.length == 1) + assert(r0.head == 0xF0.toByte) + + intercept[NoSuchElementException] { + extractor.next() + } + } + + "be able to read a FB file that has one record per block" in { + val rc = getRawRecordContext(1) + + val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(Some(2), None, Some(1))) + + assert(extractor.hasNext) + + val r0 = extractor.next() + assert(r0.length == 2) + assert(r0.head == 0xF0.toByte) + assert(r0(1) == 0xF0.toByte) + + assert(extractor.next().head == 0xF1.toByte) + assert(extractor.next().head == 0xF2.toByte) + assert(!extractor.hasNext) + } + + "be able to read a VBVR file that has multiple records per block" in { + val rc = getRawRecordContext(3) + + val extractor = new FixedBlockRawRecordExtractor(rc, FixedBlockParameters(None, None, Some(3))) + + assert(extractor.hasNext) + + val r0 = extractor.next() + assert(r0.length == 2) + assert(r0.head == 0xF0.toByte) + assert(r0(1) == 0xF0.toByte) + + assert(extractor.next().head == 0xF1.toByte) + assert(extractor.next().head == 0xF2.toByte) + assert(extractor.next().head == 0xF3.toByte) + assert(extractor.next().head == 0xF4.toByte) + assert(extractor.next().head == 0xF5.toByte) + assert(extractor.next().head == 0xF6.toByte) + assert(extractor.next().head == 0xF7.toByte) + assert(extractor.next().head == 0xF8.toByte) + assert(!extractor.hasNext) + } + } + + "failures" should { + "throw an exception when neither block length nor records per block is specified" in { + val fb = FixedBlockParameters(Some(1), None, None) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("FB record format requires block length or number records per block to be specified.")) + } + + "throw an exception when both block length and records per block are specified" in { + val fb = FixedBlockParameters(Some(1), Some(1), Some(1)) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("FB record format requires either block length or number records per block to be specified, but not both.")) + } + + "throw an exception when record length is zero" in { + val fb = FixedBlockParameters(Some(0), Some(1), None) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("Record length should be positive. Got 0.")) + } + + "throw an exception when block size is zero" in { + val fb = FixedBlockParameters(Some(1), Some(0), None) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("Block length should be positive. Got 0.")) + } + + "throw an exception when records per block is zero" in { + val fb = FixedBlockParameters(Some(1), None, Some(0)) + + val ex = intercept[IllegalArgumentException] { + FixedBlockParameters.validate(fb) + } + + assert(ex.getMessage.contains("Records per block should be positive. Got 0.")) + } + } + + private def getRawRecordContext(recordsPerBlock: Int): RawRecordContext = { + val numOfBlocks = 3 + + val bytes = Range(0, numOfBlocks) + .flatMap(i => { + Range(0, recordsPerBlock).flatMap(j => { + val num = (i * recordsPerBlock + j) % 10 + val v = (0xF0 + num).toByte + Array[Byte](v, v) + }) + }).toArray[Byte] + + getRawRecordContext(bytes) + } + + private def getRawRecordContext(bytes: Array[Byte]): RawRecordContext = { + val ibs = new TestByteStream(bytes) + + val bdwDecoder = new RecordHeaderDecoderBdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) + val rdwDecoder = new RecordHeaderDecoderRdw(RecordHeaderParametersFactory.getDummyRecordHeaderParameters(isBigEndian = true, 0)) + + RawRecordContext(0, ibs, copybook, rdwDecoder, bdwDecoder, "") + } + +} From 4cbbbbb0c6212cc289736c99500b30bd4f04a984 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 16 Sep 2021 14:34:25 +0200 Subject: [PATCH 04/13] #420 Add an integration test suite for FB record format. --- .../parameters/CobolParametersParser.scala | 6 +- .../integration/Test29BdwFileSpec.scala | 2 +- .../source/integration/Test30FbFileSpec.scala | 128 ++++++++++++++++++ 3 files changed, 132 insertions(+), 4 deletions(-) create mode 100644 spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index ed0576fe2..655fcb8ce 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -265,7 +265,7 @@ object CobolParametersParser { private def parseVariableLengthParameters(params: Parameters, recordFormat: RecordFormat): Option[VariableLengthParameters] = { val recordLengthFieldOpt = params.get(PARAM_RECORD_LENGTH_FIELD) - val isRecordSequence = Seq(VariableLength, VariableBlock, AsciiText).contains(recordFormat) + val isRecordSequence = Seq(FixedBlock, VariableLength, VariableBlock, AsciiText).contains(recordFormat) val isRecordIdGenerationEnabled = params.getOrElse(PARAM_GENERATE_RECORD_ID, "false").toBoolean val fileStartOffset = params.getOrElse(PARAM_FILE_START_OFFSET, "0").toInt val fileEndOffset = params.getOrElse(PARAM_FILE_END_OFFSET, "0").toInt @@ -323,10 +323,10 @@ object CobolParametersParser { params.get(PARAM_RECORDS_PER_BLOCK).map(_.toInt), ) if (bdw.blockLength.nonEmpty && bdw.recordsPerBlock.nonEmpty) { - throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and $PARAM_RECORDS_PER_BLOCK cannot be used together.") + throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and '$PARAM_RECORDS_PER_BLOCK' cannot be used together.") } if (recordFormat == FixedBlock && bdw.blockLength.isEmpty && bdw.recordsPerBlock.isEmpty ) { - throw new IllegalArgumentException(s"For FB file format either '$PARAM_BLOCK_LENGTH' or $PARAM_RECORDS_PER_BLOCK must be specified.") + throw new IllegalArgumentException(s"For FB file format either '$PARAM_BLOCK_LENGTH' or '$PARAM_RECORDS_PER_BLOCK' must be specified.") } if (recordFormat == VariableBlock && bdw.blockLength.nonEmpty) { logger.warn(s"Option '$PARAM_BLOCK_LENGTH' is ignored for record format: VB") diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala index 554c772b3..0d3164b9b 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test29BdwFileSpec.scala @@ -26,7 +26,7 @@ import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture //noinspection NameBooleanParameters class Test29BdwFileSpec extends WordSpec with SparkTestBase with BinaryFileFixture { - private val exampleName = "Test29 (VB record format RDW+BDW" + private val exampleName = "Test29 (VB record format RDW+BDW)" private val copybook = """ 01 R. diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala new file mode 100644 index 000000000..89ad5c7bb --- /dev/null +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala @@ -0,0 +1,128 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.spark.cobol.source.integration + +import org.apache.spark.SparkException +import org.apache.spark.sql.functions.col +import org.scalatest.WordSpec +import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase +import za.co.absa.cobrix.spark.cobol.source.fixtures.BinaryFileFixture + +//noinspection NameBooleanParameters +class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixture { + + private val exampleName = "Test30 (FB record format)" + + private val copybook = + """ 01 R. + 03 A PIC X(2). + """ + + val expected2Records = """[{"A":"00"},{"A":"11"}]""" + val expected4Records = """[{"A":"00"},{"A":"11"},{"A":"22"},{"A":"33"}]""" + + "FB record loader" should { + "load data when record length and block length are specified" in { + testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "4", "record_length" -> "2"), expected2Records) + } + + "load data when block length is specified" in { + testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "2"), expected2Records) + } + + "load data when the number records per block is specified" in { + testVbRecordLoad(2, 2, Map[String, String]("records_per_block" -> "2"), expected4Records) + } + + "load data when smaller records are provided" in { + testVbRecordLoad(1, 2, + Map[String, String]("block_length" -> "4", "record_length" -> "1"), + """[{"A":"0"},{"A":"0"},{"A":"1"},{"A":"1"}]""", + ignoreCount = true) + } + } + + "FB record failures should happen" when { + "Block length is negative" in { + val ex = intercept[SparkException] { + testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "-1"), "") + } + + assert(ex.getCause.getMessage.contains("Block length should be positive. Got -1.")) + } + + "Records per block is negative" in { + val ex = intercept[SparkException] { + testVbRecordLoad(1, 2, Map[String, String]("records_per_block" -> "-1"), "") + } + + assert(ex.getCause.getMessage.contains("Records per block should be positive. Got -1.")) + } + + "Both block length and records per block are specified" in { + val ex = intercept[IllegalArgumentException] { + testVbRecordLoad(1, 2, Map[String, String]("block_length" -> "2", "records_per_block" -> "2"), "") + } + + assert(ex.getMessage.contains("Options 'block_length' and 'records_per_block' cannot be used together.")) + } + + "Mandatory options are missing" in { + val ex = intercept[IllegalArgumentException] { + testVbRecordLoad(1, 2, Map[String, String](), "") + } + + assert(ex.getMessage.contains("For FB file format either 'block_length' or 'records_per_block' must be specified.")) + } + } + + private def testVbRecordLoad(blocks: Int, + records: Int, + options: Map[String, String], + expected: String, + ignoreCount: Boolean = false): Unit = { + val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => { + Range(0, records).flatMap(recordNum => { + val idx = (blockNum * records + recordNum) % 10 + val v = (0xF0 + idx).toByte + Seq(v, v) + }) + }) + + withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 => + val df = spark + .read + .format("cobol") + .option("copybook_contents", copybook) + .option("record_format", "FB") + .options(options) + .load(tmpFileName1) + + val actual = df + .orderBy(col("A")) + .toJSON + .collect() + .mkString("[", ",", "]") + + if (!ignoreCount) { + assert(df.count() == blocks * records) + } + assert(actual == expected) + } + } + +} From 229250e110d29583a81ca12aed984a08aafd1ea0 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 16 Sep 2021 14:34:51 +0200 Subject: [PATCH 05/13] #420 Suppress logger in tests. --- .../src/test/resources/log4j.properties | 37 +++---------------- 1 file changed, 6 insertions(+), 31 deletions(-) diff --git a/spark-cobol/src/test/resources/log4j.properties b/spark-cobol/src/test/resources/log4j.properties index d80dac076..df78b9a39 100644 --- a/spark-cobol/src/test/resources/log4j.properties +++ b/spark-cobol/src/test/resources/log4j.properties @@ -14,37 +14,12 @@ # limitations under the License. # -# Set everything to be logged to the file core/target/unit-tests.log -log4j.rootLogger=DEBUG, CA, FA - -#Console Appender -log4j.appender.CA=org.apache.log4j.ConsoleAppender -log4j.appender.CA.layout=org.apache.log4j.PatternLayout -log4j.appender.CA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %p %c: %m%n -log4j.appender.CA.Threshold = WARN - -#File Appender -log4j.appender.FA=org.apache.log4j.FileAppender -log4j.appender.FA.append=false -log4j.appender.FA.file=target/unit-tests.log -log4j.appender.FA.layout=org.apache.log4j.PatternLayout -log4j.appender.FA.layout.ConversionPattern=%d{HH:mm:ss.SSS} %t %p %c{1}: %m%n - -# Set the logger level of File Appender to WARN -log4j.appender.FA.Threshold = INFO - -# Some packages are noisy for no good reason. -log4j.additivity.parquet.hadoop.ParquetRecordReader=false -log4j.logger.parquet.hadoop.ParquetRecordReader=OFF - -log4j.additivity.org.apache.hadoop.hive.serde2.lazy.LazyStruct=false -log4j.logger.org.apache.hadoop.hive.serde2.lazy.LazyStruct=OFF - -log4j.additivity.org.apache.hadoop.hive.metastore.RetryingHMSHandler=false -log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=OFF - -log4j.additivity.hive.ql.metadata.Hive=false -log4j.logger.hive.ql.metadata.Hive=OFF +log4j.rootCategory=INFO, console +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n +log4j.appender.console.Threshold=ERROR # Turn off noisy loggers in unit tests log4j.logger.org.apache.hadoop.util.NativeCodeLoader=ERROR From 76ed1fbb7ffe6b44d46dc955288848eabf3b5b5c Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 16 Sep 2021 16:20:56 +0200 Subject: [PATCH 06/13] #420 Make Scala 2.11 compiler happy --- .../extractors/raw/FixedBlockParameters.scala | 16 ++++++++++++++++ .../cobrix/cobol/reader/parameters/Bdw.scala | 18 +++++++++++++++++- .../parameters/CobolParametersParser.scala | 2 +- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala index b348267e0..d2296196f 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala @@ -1,3 +1,19 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.cobrix.cobol.reader.extractors.raw case class FixedBlockParameters( diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala index 96120355c..e772ef3bb 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/Bdw.scala @@ -1,8 +1,24 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package za.co.absa.cobrix.cobol.reader.parameters case class Bdw( isBigEndian: Boolean, adjustment: Int, blockLength: Option[Int], - recordsPerBlock: Option[Int], + recordsPerBlock: Option[Int] ) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index 655fcb8ce..dfbae29f7 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -320,7 +320,7 @@ object CobolParametersParser { params.getOrElse(PARAM_IS_BDW_BIG_ENDIAN, "false").toBoolean, params.getOrElse(PARAM_BDW_ADJUSTMENT, "0").toInt, params.get(PARAM_BLOCK_LENGTH).map(_.toInt), - params.get(PARAM_RECORDS_PER_BLOCK).map(_.toInt), + params.get(PARAM_RECORDS_PER_BLOCK).map(_.toInt) ) if (bdw.blockLength.nonEmpty && bdw.recordsPerBlock.nonEmpty) { throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and '$PARAM_RECORDS_PER_BLOCK' cannot be used together.") From 82f46b9e3b3d525f2da2c128fba3f70b49a5abf7 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 17 Sep 2021 08:39:11 +0200 Subject: [PATCH 07/13] Build: Select Spark version based on Scala version if not provided explicitly. --- build.sbt | 6 +++--- project/Dependencies.scala | 20 +++++++++++++++----- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/build.sbt b/build.sbt index a1f389442..96153b743 100644 --- a/build.sbt +++ b/build.sbt @@ -68,11 +68,11 @@ lazy val sparkCobol = (project in file("spark-cobol")) name := "spark-cobol", printSparkVersion := { val log = streams.value.log - log.info(s"Building with Spark $sparkVersion") - sparkVersion + log.info(s"Building with Spark ${sparkVersion(scalaVersion.value)}, Scala ${scalaVersion.value}") + sparkVersion(scalaVersion.value) }, (Compile / compile) := ((Compile / compile) dependsOn printSparkVersion).value, - libraryDependencies ++= SparkCobolDependencies :+ getScalaDependency(scalaVersion.value), + libraryDependencies ++= SparkCobolDependencies(scalaVersion.value) :+ getScalaDependency(scalaVersion.value), dependencyOverrides ++= SparkCobolDependenciesOverride, Test / fork := true, // Spark tests fail randomly otherwise populateBuildInfoTemplate, diff --git a/project/Dependencies.scala b/project/Dependencies.scala index fb1906884..98dde9d0a 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -17,7 +17,6 @@ import sbt._ object Dependencies { - private val guavaVersion = "15.0" private val scodecBitsVersion = "1.1.4" private val scodecCoreVersion = "1.10.3" @@ -27,14 +26,25 @@ object Dependencies { private val scalatestVersion = "3.0.1" - def sparkVersion: String = sys.props.getOrElse("SPARK_VERSION", "3.1.2") + private val defaultSparkVersionForScala211 = "2.4.8" + private val defaultSparkVersionForScala212 = "3.1.2" + + def sparkFallbackVersion(scalaVersion: String): String = { + if (scalaVersion.startsWith("2.11")) { + defaultSparkVersionForScala211 + } else { + defaultSparkVersionForScala212 + } + } + + def sparkVersion(scalaVersion: String): String = sys.props.getOrElse("SPARK_VERSION", sparkFallbackVersion(scalaVersion)) def getScalaDependency(scalaVersion: String): ModuleID = "org.scala-lang" % "scala-library" % scalaVersion % Provided - val SparkCobolDependencies: Seq[ModuleID] = Seq( + def SparkCobolDependencies(scalaVersion: String): Seq[ModuleID] = Seq( // provided - "org.apache.spark" %% "spark-sql" % sparkVersion % Provided, - "org.apache.spark" %% "spark-streaming" % sparkVersion % Provided, + "org.apache.spark" %% "spark-sql" % sparkVersion(scalaVersion) % Provided, + "org.apache.spark" %% "spark-streaming" % sparkVersion(scalaVersion) % Provided, // test "org.scalatest" %% "scalatest" % scalatestVersion % Test From 8c6653153fb2894d8e0d55383a0e79745bfad3ad Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 17 Sep 2021 08:50:39 +0200 Subject: [PATCH 08/13] #420 Add FB usage description to README. --- README.md | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index cc17da0cb..15249f9b3 100644 --- a/README.md +++ b/README.md @@ -317,6 +317,31 @@ val copyBook = CopybookParser.parseTree(copyBookContents) println(copyBook.generateRecordLayoutPositions()) ``` +### Fixed record length files +Cobrix assumes files has fixed length (`F`) record format by default. The record length is determined by the length of +the record defined by the copybook. But you can specify the record length explicitly: +``` +.option("record_format", "F") +.option("record_length", "250") +``` + +Fixed block record formats (`FB`) are also supported. The support is _experimental_, if you find any issues, please +let us know. When the record format is 'FB' you need to specify either block length or number of records per +block. As with 'F' if `record_length` is not specified, it will be determined from the copybook. +``` +.option("record_format", "FB") +.option("record_length", "250") +.option("block_length", "500") +``` +or +``` +.option("record_format", "FB") +.option("record_length", "250") +.option("records_per_block", "2") +``` + +More on fixed-length record formats: https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats + ### Variable length records support Cobrix supports variable record length files. The only requirement is that such a file should contain a standard 4 byte @@ -1112,7 +1137,6 @@ Again, the full example is available at | Option (usage example) | Description | | ------------------------------------------ |:----------------------------------------------------------------------------- | | .option("paths", "/path1,/path2") | Allows loading data from multiple unrelated paths on the same filesystem. | -| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. | | .option("file_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each file. | | .option("file_end_offset", "0") | Specifies the number of bytes to skip at the end of each file. | | .option("record_start_offset", "0") | Specifies the number of bytes to skip at the beginning of each record before applying copybook fields to data. | @@ -1152,11 +1176,17 @@ Again, the full example is available at | .option("with_input_file_name_col", "file_name") | Generates a column containing input file name for each record (Similar to Spark SQL `input_file_name()` function). The column name is specified by the value of the option. This option only works for variable record length files. For fixed record length files use `input_file_name()`. | | .option("debug", "hex") | If specified, each primitive field will be accompanied by a debug field containing raw bytes from the source file. Possible values: `none` (default), `hex`, `binary`. The legacy value `true` is supported and will generate debug fields in HEX. | -##### Variable record length files options +##### Fixed length record format options (for record_format = F or FB) +| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). | +| .option("record_length", "100") | Overrides the length of the record (in bypes). Normally, the size is derived from the copybook. But explicitly specifying record size can be helpful for debugging fixed-record length files. | +| .option("block_length", "500") | Specifies the block length for FB records. It should be a multiple of 'record_length'. Cannot be used together with `records_per_block` | +| .option("records_per_block", "5") | Specifies the number of records ber block for FB records. Cannot be used together with `block_length` | + +##### Variable record length files options (for record_format = V or VB) | Option (usage example) | Description | | --------------------------------------------- |:----------------------------------------------------------------------------- | -| .option("record_format", "F") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). | +| .option("record_format", "V") | Record format from the [spec](https://www.ibm.com/docs/en/zos/2.3.0?topic=files-selecting-record-formats-non-vsam-data-sets). One of `F` (fixed length, default), `FB` (fixed block), V` (variable length RDW), `VB` (variable block BDW+RDW), `D` (ASCII text). | | .option("is_record_sequence", "true") | _[deprecated]_ If 'true' the parser will look for 4 byte RDW headers to read variable record length files. Use `.option("record_format", "V")` instead. | | .option("is_rdw_big_endian", "true") | Specifies if RDW headers are big endian. They are considered little-endian by default. | | .option("is_rdw_part_of_record_length", false)| Specifies if RDW headers count themselves as part of record length. By default RDW headers count only payload record in record length, not RDW headers themselves. This is equivalent to `.option("rdw_adjustment", -4)`. For BDW use `.option("bdw_adjustment", -4)` | @@ -1287,6 +1317,9 @@ For multisegment variable lengths tests: ![](performance/images/exp3_multiseg_wide_records_throughput.svg) ![](performance/images/exp3_multiseg_wide_mb_throughput.svg) ## Changelog +- #### 2.4.1 to be released soon. + - [#420](https://github.com/AbsaOSS/cobrix/issues/420) Add _experimental_ support for [fixed blocked (FB)](https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats) record format. + - #### 2.4.0 released 7 September 2021. - [#412](https://github.com/AbsaOSS/cobrix/issues/412) Add support for [variable block (VB aka VBVR)](https://www.ibm.com/docs/en/zos/2.3.0?topic=formats-format-v-records) record format. Options to adjust BDW settings are added: @@ -1294,6 +1327,7 @@ For multisegment variable lengths tests: - `bdw_adjustment` - Specifies how the value of a BDW is different from the block payload. For example, if the side in BDW headers includes BDW record itself, use `.option("bdw_adjustment", "-4")`. - Options `is_record_sequence` and `is_xcom` are deprecated. Use `.option("record_format", "V")` instead. - [#417](https://github.com/AbsaOSS/cobrix/issues/417) Multisegment ASCII text files have now direct support using `record_format = D`. + - #### 2.3.0 released 2 August 2021. - [#405](https://github.com/AbsaOSS/cobrix/issues/405) Fix extracting records that contain redefines of the top level GROUPs. - [#406](https://github.com/AbsaOSS/cobrix/issues/406) Use 'collapse_root' retention policy by default. This is the breaking, From d303eba0f8bc6da03106604b61cc5b5180d7aa4f Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 17 Sep 2021 13:38:37 +0200 Subject: [PATCH 09/13] #422 Fix decoding of the 'broken pipe' character from EBCDIC. --- README.md | 1 + .../cobrix/cobol/parser/encoding/codepage/CodePage037.scala | 2 +- .../cobol/parser/encoding/codepage/CodePage037Ext.scala | 2 +- .../cobol/parser/encoding/codepage/CodePageCommon.scala | 2 +- .../cobol/parser/encoding/codepage/CodePageCommonExt.scala | 2 +- data/test9_expected/test9_cp037.txt | 4 ++-- data/test9_expected/test9_cp037_ext.txt | 4 ++-- 7 files changed, 9 insertions(+), 8 deletions(-) diff --git a/README.md b/README.md index 15249f9b3..efaeed258 100644 --- a/README.md +++ b/README.md @@ -1319,6 +1319,7 @@ For multisegment variable lengths tests: ## Changelog - #### 2.4.1 to be released soon. - [#420](https://github.com/AbsaOSS/cobrix/issues/420) Add _experimental_ support for [fixed blocked (FB)](https://www.ibm.com/docs/en/zos/2.3.0?topic=sets-fixed-length-record-formats) record format. + - [#422](https://github.com/AbsaOSS/cobrix/issues/422) Fixed decoding of 'broken pipe' (`¦`) character from EBCDIC. - #### 2.4.0 released 7 September 2021. - [#412](https://github.com/AbsaOSS/cobrix/issues/412) Add support for [variable block (VB aka VBVR)](https://www.ibm.com/docs/en/zos/2.3.0?topic=formats-format-v-records) record format. diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala index 43e2e1e49..e9bb29532 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037.scala @@ -46,7 +46,7 @@ class CodePage037 extends CodePage { spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, // 48 - 63 ' ', rsp, 'â', 'ä', 'à', 'á', 'ã', 'å', 'ç', 'ñ', '¢', '.', '<', '(', '+', '|', // 64 - 79 '&', 'é', 'ê', 'ë', 'è', 'í', 'î', 'ï', 'ì', 'ß', '!', '$', '*', ')', ';', '¬', // 80 - 95 - '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '|', ',', '%', '_', '>', '?', // 96 - 111 + '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '¦', ',', '%', '_', '>', '?', // 96 - 111 'ø', 'É', 'Ê', 'Ë', 'È', 'Í', 'Î', 'Ï', 'Ì', '`', ':', '#', '@', qts, '=', qtd, // 112 - 127 'Ø', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', '«', '»', 'ð', 'ý', 'þ', '±', // 128 - 143 '°', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 'ª', 'º', 'æ', '¸', 'Æ', '¤', // 144 - 159 diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala index 1229e6467..d81c95364 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePage037Ext.scala @@ -79,7 +79,7 @@ class CodePage037Ext extends CodePage { spc, spc, c16, spc, spc, spc, spc, c04, spc, spc, spc, spc, c14, c15, spc, c1a, // 48 - 63 ' ', rsp, 'â', 'ä', 'à', 'á', 'ã', 'å', 'ç', 'ñ', '¢', '.', '<', '(', '+', '|', // 64 - 79 '&', 'é', 'ê', 'ë', 'è', 'í', 'î', 'ï', 'ì', 'ß', '!', '$', '*', ')', ';', '¬', // 80 - 95 - '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '|', ',', '%', '_', '>', '?', // 96 - 111 + '-', '/', 'Â', 'Ä', 'À', 'Á', 'Ã', 'Å', 'Ç', 'Ñ', '¦', ',', '%', '_', '>', '?', // 96 - 111 'ø', 'É', 'Ê', 'Ë', 'È', 'Í', 'Î', 'Ï', 'Ì', '`', ':', '#', '@', qts, '=', qtd, // 112 - 127 'Ø', 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', '«', '»', 'ð', 'ý', 'þ', '±', // 128 - 143 '°', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 'ª', 'º', 'æ', '¸', 'Æ', '¤', // 144 - 159 diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala index 999d29aef..5d48118cf 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommon.scala @@ -41,7 +41,7 @@ class CodePageCommon extends CodePage { spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, spc, // 48 - 63 ' ', ' ', spc, spc, spc, spc, spc, spc, spc, spc, spc, '.', '<', '(', '+', '|', // 64 - 79 '&', spc, spc, spc, spc, spc, spc, spc, spc, spc, '!', '$', '*', ')', ';', spc, // 80 - 95 - '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '|', ',', '%', '_', '>', '?', // 96 - 111 + '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '¦', ',', '%', '_', '>', '?', // 96 - 111 spc, spc, spc, spc, spc, spc, spc, spc, spc, '`', ':', '#', '@', qts, '=', qtd, // 112 - 127 spc, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', spc, spc, spc, spc, spc, spc, // 128 - 143 spc, 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', spc, spc, spc, spc, spc, spc, // 144 - 159 diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala index 5881e7174..fb8fe79e0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/encoding/codepage/CodePageCommonExt.scala @@ -75,7 +75,7 @@ class CodePageCommonExt extends CodePage { spc, spc, c16, spc, spc, spc, spc, c04, spc, spc, spc, spc, c14, c15, spc, spc, // 48 - 63 ' ', ' ', spc, spc, spc, spc, spc, spc, spc, spc, spc, '.', '<', '(', '+', '|', // 64 - 79 '&', spc, spc, spc, spc, spc, spc, spc, spc, spc, '!', '$', '*', ')', ';', spc, // 80 - 95 - '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '|', ',', '%', '_', '>', '?', // 96 - 111 + '-', '/', spc, spc, spc, spc, spc, spc, spc, spc, '¦', ',', '%', '_', '>', '?', // 96 - 111 spc, spc, spc, spc, spc, spc, spc, spc, spc, '`', ':', '#', '@', qts, '=', qtd, // 112 - 127 spc, 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', spc, spc, spc, spc, spc, spc, // 128 - 143 spc, 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', spc, spc, spc, spc, spc, spc, // 144 - 159 diff --git a/data/test9_expected/test9_cp037.txt b/data/test9_expected/test9_cp037.txt index 0ae58b103..bc1ec1009 100644 --- a/data/test9_expected/test9_cp037.txt +++ b/data/test9_expected/test9_cp037.txt @@ -1,11 +1,11 @@ {"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"©û ¯á3Z ÑT ¼Å8","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":36.88} {"CURRENCY":"CZK","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ùçc¤ ³® Aº ëd","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":59.80} -{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´ #ñ#B \" :Ã|­à","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":767.31} +{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´ #ñ#B \" :æ­à","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":767.31} {"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"ZÁ Ì0z eCjú","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":873.44} {"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"8oÂÔàÒ «Ð· \"ç","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":39.71} {"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"¿  ]Æ»ýY)Ô%@ÌÒ","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":536.19} {"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ëù±l¥Ù Ë^o _²<","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":346.57} -{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få© èÙªÎÓÖ|","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":471.30} +{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få© èÙªÎÓÖ¦","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":471.30} {"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"¶ýB}óþ F#«RÇÙ","COMPANY_ID":"00000000","WEALTH_QFY":0,"AMOUNT":287.83} {"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ý ëºê6 %±Çì","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":60112.00} {"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"ïêÖj% ®¯\\¶ X","COMPANY_ID":"00000000","WEALTH_QFY":1,"AMOUNT":539.59} diff --git a/data/test9_expected/test9_cp037_ext.txt b/data/test9_expected/test9_cp037_ext.txt index 33f3b2a76..39dd3f1bc 100644 --- a/data/test9_expected/test9_cp037_ext.txt +++ b/data/test9_expected/test9_cp037_ext.txt @@ -1,11 +1,11 @@ {"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"©û\u0012¯á3Z ÑT ¼Å8\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":36.88} {"CURRENCY":"CZK","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ùçc¤ ³®\u001A Aº\u0014ëd\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":59.80} -{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´\u0011#ñ#B \" :Ã|­à\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":767.31} +{"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":"´\u0011#ñ#B \" :æ­à\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":767.31} {"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"ZÁ\u0007Ì0z \u0003\u0015eCjú \u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":873.44} {"CURRENCY":"CHF","SIGNATURE":"S9276511","COMPANY_NAME_NP":"8oÂÔàÒ\u0010\u001F«Ð· \"ç\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":39.71} {"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"¿\u001C ]Æ»ýY)Ô%@ÌÒ\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":536.19} {"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ëù±l¥Ù Ë^o _²<\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":346.57} -{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få©\u000BèÙªÎÓÖ|\n\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":471.30} +{"CURRENCY":"EUR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"é/Få©\u000BèÙªÎÓÖ¦\n\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":471.30} {"CURRENCY":"GBP","SIGNATURE":"S9276511","COMPANY_NAME_NP":"\u001F¶ýB}óþ\fF#«RÇÙ\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":0,"AMOUNT":287.83} {"CURRENCY":"ZAR","SIGNATURE":"S9276511","COMPANY_NAME_NP":"Ý ëºê6\u0016 %±Çì\u000E \u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":60112.00} {"CURRENCY":"USD","SIGNATURE":"S9276511","COMPANY_NAME_NP":" ïêÖj% ®¯\\¶ X\u0000","COMPANY_ID":"00000000\u0000\u0000","WEALTH_QFY":1,"AMOUNT":539.59} From 028806e1ea14d1358c50f6a223acef191fdc8166 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 17 Sep 2021 15:27:12 +0200 Subject: [PATCH 10/13] Update minor version of Scala 2.12 --- .github/workflows/build.yml | 2 +- README.md | 4 ++-- pom.xml | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 7c9d7a93c..71a30b8e2 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -12,7 +12,7 @@ jobs: strategy: fail-fast: false matrix: - scala: [ 2.11.12, 2.12.14 ] + scala: [ 2.11.12, 2.12.15 ] spark: [ 2.4.8, 3.1.2 ] exclude: - scala: 2.11.12 diff --git a/README.md b/README.md index efaeed258..c3d591d91 100644 --- a/README.md +++ b/README.md @@ -266,8 +266,8 @@ all required dependencies (an uber jar aka fat jar). Creating an uber jar for Cobrix is very easy. Just clone the repository and run one of the following commands: ```sh sbt ++2.11.12 assembly -DSPARK_VERSION=2.4.8 -sbt ++2.12.14 assembly -DSPARK_VERSION=2.4.8 -sbt ++2.12.14 assembly -DSPARK_VERSION=3.1.2 +sbt ++2.12.15 assembly -DSPARK_VERSION=2.4.8 +sbt ++2.12.15 assembly -DSPARK_VERSION=3.1.2 ``` You can collect the uber jar of `spark-cobol` either at diff --git a/pom.xml b/pom.xml index ecf66fb48..b67ead461 100644 --- a/pom.xml +++ b/pom.xml @@ -107,7 +107,7 @@ 1.6 - 2.12.14 + 2.12.15 2.12 3.1.2 3.0.1 From 7a652d7c96972f3b6c961173c5f3bd785221a960 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 17 Sep 2021 15:45:42 +0200 Subject: [PATCH 11/13] Update version of sbt --- project/build.properties | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/build.properties b/project/build.properties index 139325827..bb5389da2 100644 --- a/project/build.properties +++ b/project/build.properties @@ -1 +1 @@ -sbt.version=1.4.5 \ No newline at end of file +sbt.version=1.5.5 \ No newline at end of file From 0359706782d2a80b766b7bcd3d54904471f2e3cb Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Fri, 17 Sep 2021 15:56:09 +0200 Subject: [PATCH 12/13] Fix formatting --- .../extractors/raw/FixedBlockParameters.scala | 2 +- .../parameters/CobolParametersParser.scala | 120 +++++++++--------- 2 files changed, 61 insertions(+), 61 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala index d2296196f..58228521a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala @@ -34,4 +34,4 @@ object FixedBlockParameters { params.blockLength.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Block length should be positive. Got $x.")) params.recordsPerBlock.foreach(x => if (x < 1) throw new IllegalArgumentException(s"Records per block should be positive. Got $x.")) } -} \ No newline at end of file +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index dfbae29f7..e6ebdeac7 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -39,78 +39,78 @@ import scala.collection.mutable.ListBuffer object CobolParametersParser { private val logger = LoggerFactory.getLogger(this.getClass) - val SHORT_NAME = "cobol" - val PARAM_COPYBOOK_PATH = "copybook" - val PARAM_MULTI_COPYBOOK_PATH = "copybooks" - val PARAM_COPYBOOK_CONTENTS = "copybook_contents" - val PARAM_SOURCE_PATH = "path" - val PARAM_SOURCE_PATHS = "paths" - val PARAM_ENCODING = "encoding" - val PARAM_PEDANTIC = "pedantic" - val PARAM_RECORD_LENGTH_FIELD = "record_length_field" - val PARAM_RECORD_START_OFFSET = "record_start_offset" - val PARAM_RECORD_END_OFFSET = "record_end_offset" - val PARAM_FILE_START_OFFSET = "file_start_offset" - val PARAM_FILE_END_OFFSET = "file_end_offset" + val SHORT_NAME = "cobol" + val PARAM_COPYBOOK_PATH = "copybook" + val PARAM_MULTI_COPYBOOK_PATH = "copybooks" + val PARAM_COPYBOOK_CONTENTS = "copybook_contents" + val PARAM_SOURCE_PATH = "path" + val PARAM_SOURCE_PATHS = "paths" + val PARAM_ENCODING = "encoding" + val PARAM_PEDANTIC = "pedantic" + val PARAM_RECORD_LENGTH_FIELD = "record_length_field" + val PARAM_RECORD_START_OFFSET = "record_start_offset" + val PARAM_RECORD_END_OFFSET = "record_end_offset" + val PARAM_FILE_START_OFFSET = "file_start_offset" + val PARAM_FILE_END_OFFSET = "file_end_offset" // Schema transformation parameters - val PARAM_GENERATE_RECORD_ID = "generate_record_id" - val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" - val PARAM_GROUP_FILLERS = "drop_group_fillers" - val PARAM_VALUE_FILLERS = "drop_value_fillers" + val PARAM_GENERATE_RECORD_ID = "generate_record_id" + val PARAM_SCHEMA_RETENTION_POLICY = "schema_retention_policy" + val PARAM_GROUP_FILLERS = "drop_group_fillers" + val PARAM_VALUE_FILLERS = "drop_value_fillers" - val PARAM_GROUP_NOT_TERMINALS = "non_terminals" - val PARAM_OCCURS_MAPPINGS = "occurs_mappings" - val PARAM_DEBUG = "debug" + val PARAM_GROUP_NOT_TERMINALS = "non_terminals" + val PARAM_OCCURS_MAPPINGS = "occurs_mappings" + val PARAM_DEBUG = "debug" // General parsing parameters - val PARAM_TRUNCATE_COMMENTS = "truncate_comments" - val PARAM_COMMENTS_LBOUND = "comments_lbound" - val PARAM_COMMENTS_UBOUND = "comments_ubound" + val PARAM_TRUNCATE_COMMENTS = "truncate_comments" + val PARAM_COMMENTS_LBOUND = "comments_lbound" + val PARAM_COMMENTS_UBOUND = "comments_ubound" // Data parsing parameters - val PARAM_STRING_TRIMMING_POLICY = "string_trimming_policy" - val PARAM_EBCDIC_CODE_PAGE = "ebcdic_code_page" - val PARAM_EBCDIC_CODE_PAGE_CLASS = "ebcdic_code_page_class" - val PARAM_ASCII_CHARSET = "ascii_charset" - val PARAM_IS_UTF16_BIG_ENDIAN = "is_utf16_big_endian" - val PARAM_FLOATING_POINT_FORMAT = "floating_point_format" - val PARAM_VARIABLE_SIZE_OCCURS = "variable_size_occurs" - val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection" + val PARAM_STRING_TRIMMING_POLICY = "string_trimming_policy" + val PARAM_EBCDIC_CODE_PAGE = "ebcdic_code_page" + val PARAM_EBCDIC_CODE_PAGE_CLASS = "ebcdic_code_page_class" + val PARAM_ASCII_CHARSET = "ascii_charset" + val PARAM_IS_UTF16_BIG_ENDIAN = "is_utf16_big_endian" + val PARAM_FLOATING_POINT_FORMAT = "floating_point_format" + val PARAM_VARIABLE_SIZE_OCCURS = "variable_size_occurs" + val PARAM_IMPROVED_NULL_DETECTION = "improved_null_detection" // Parameters for multisegment variable length files - val PARAM_RECORD_FORMAT = "record_format" - val PARAM_RECORD_LENGTH = "record_length" - val PARAM_IS_XCOM = "is_xcom" - val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" - val PARAM_IS_TEXT = "is_text" - val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian" - val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian" - val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" - val PARAM_RDW_ADJUSTMENT = "rdw_adjustment" - val PARAM_BDW_ADJUSTMENT = "bdw_adjustment" - val PARAM_BLOCK_LENGTH = "block_length" - val PARAM_RECORDS_PER_BLOCK = "records_per_block" - val PARAM_SEGMENT_FIELD = "segment_field" - val PARAM_SEGMENT_ID_ROOT = "segment_id_root" - val PARAM_SEGMENT_FILTER = "segment_filter" - val PARAM_SEGMENT_ID_LEVEL_PREFIX = "segment_id_level" - val PARAM_RECORD_HEADER_PARSER = "record_header_parser" - val PARAM_RECORD_EXTRACTOR = "record_extractor" - val PARAM_RHP_ADDITIONAL_INFO = "rhp_additional_info" - val PARAM_RE_ADDITIONAL_INFO = "re_additional_info" - val PARAM_INPUT_FILE_COLUMN = "with_input_file_name_col" + val PARAM_RECORD_FORMAT = "record_format" + val PARAM_RECORD_LENGTH = "record_length" + val PARAM_IS_XCOM = "is_xcom" + val PARAM_IS_RECORD_SEQUENCE = "is_record_sequence" + val PARAM_IS_TEXT = "is_text" + val PARAM_IS_RDW_BIG_ENDIAN = "is_rdw_big_endian" + val PARAM_IS_BDW_BIG_ENDIAN = "is_bdw_big_endian" + val PARAM_IS_RDW_PART_REC_LENGTH = "is_rdw_part_of_record_length" + val PARAM_RDW_ADJUSTMENT = "rdw_adjustment" + val PARAM_BDW_ADJUSTMENT = "bdw_adjustment" + val PARAM_BLOCK_LENGTH = "block_length" + val PARAM_RECORDS_PER_BLOCK = "records_per_block" + val PARAM_SEGMENT_FIELD = "segment_field" + val PARAM_SEGMENT_ID_ROOT = "segment_id_root" + val PARAM_SEGMENT_FILTER = "segment_filter" + val PARAM_SEGMENT_ID_LEVEL_PREFIX = "segment_id_level" + val PARAM_RECORD_HEADER_PARSER = "record_header_parser" + val PARAM_RECORD_EXTRACTOR = "record_extractor" + val PARAM_RHP_ADDITIONAL_INFO = "rhp_additional_info" + val PARAM_RE_ADDITIONAL_INFO = "re_additional_info" + val PARAM_INPUT_FILE_COLUMN = "with_input_file_name_col" // Indexed multisegment file processing - val PARAM_ENABLE_INDEXES = "enable_indexes" - val PARAM_INPUT_SPLIT_RECORDS = "input_split_records" - val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb" - val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix" - val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation" - val PARAM_IMPROVE_LOCALITY = "improve_locality" + val PARAM_ENABLE_INDEXES = "enable_indexes" + val PARAM_INPUT_SPLIT_RECORDS = "input_split_records" + val PARAM_INPUT_SPLIT_SIZE_MB = "input_split_size_mb" + val PARAM_SEGMENT_ID_PREFIX = "segment_id_prefix" + val PARAM_OPTIMIZE_ALLOCATION = "optimize_allocation" + val PARAM_IMPROVE_LOCALITY = "improve_locality" // Parameters for debugging - val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size" + val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size" private def getSchemaRetentionPolicy(params: Parameters): SchemaRetentionPolicy = { val schemaRetentionPolicyName = params.getOrElse(PARAM_SCHEMA_RETENTION_POLICY, "collapse_root") @@ -729,4 +729,4 @@ object CobolParametersParser { val parsedParams = parser.parseMap(params) parsedParams.map(kv => kv._1 -> kv._2.asInstanceOf[Map[String, Any]].map(x => x._1 -> x._2.asInstanceOf[Int])) } -} \ No newline at end of file +} From 6ba830460f70015f7022b55a77bccd000381a3f5 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 21 Sep 2021 08:56:46 +0200 Subject: [PATCH 13/13] #420 Add support for FB format with BDW only. --- README.md | 15 +++++++- .../extractors/raw/FixedBlockParameters.scala | 3 -- .../raw/FixedBlockRawRecordExtractor.scala | 16 ++++++--- .../FixedBlockRawRecordExtractorSuite.scala | 13 +++---- .../parameters/CobolParametersParser.scala | 3 -- .../source/integration/Test30FbFileSpec.scala | 36 +++++++++++++------ 6 files changed, 55 insertions(+), 31 deletions(-) diff --git a/README.md b/README.md index c3d591d91..08332010f 100644 --- a/README.md +++ b/README.md @@ -326,8 +326,21 @@ the record defined by the copybook. But you can specify the record length explic ``` Fixed block record formats (`FB`) are also supported. The support is _experimental_, if you find any issues, please -let us know. When the record format is 'FB' you need to specify either block length or number of records per +let us know. When the record format is 'FB' you can specify block length or number of records per block. As with 'F' if `record_length` is not specified, it will be determined from the copybook. + +Records that have BDWs, but not rdws can be read like this: +``` +.option("record_format", "FB") +.option("record_length", "250") +``` +or simply +``` +.option("record_format", "FB") +``` + +Records that have neither BDWs nor RDWs can be read like this: + ``` .option("record_format", "FB") .option("record_length", "250") diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala index 58228521a..aac7f9b8c 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockParameters.scala @@ -24,9 +24,6 @@ case class FixedBlockParameters( object FixedBlockParameters { def validate(params: FixedBlockParameters): Unit = { - if (params.blockLength.isEmpty && params.recordsPerBlock.isEmpty) { - throw new IllegalArgumentException("FB record format requires block length or number records per block to be specified.") - } if (params.blockLength.nonEmpty && params.recordsPerBlock.nonEmpty) { throw new IllegalArgumentException("FB record format requires either block length or number records per block to be specified, but not both.") } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala index fa7c43f6e..f8b355546 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractor.scala @@ -22,7 +22,7 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa private val recordQueue = new mutable.Queue[Array[Byte]] private val recordSize = fbParams.recordLength.getOrElse(ctx.copybook.getRecordSize) - private val bdwSize = fbParams.blockLength.getOrElse(fbParams.recordsPerBlock.get * recordSize) + private val bdwSize = fbParams.blockLength.orElse(fbParams.recordsPerBlock.map(_ * recordSize)) override def offset: Long = ctx.inputStream.offset @@ -35,14 +35,20 @@ class FixedBlockRawRecordExtractor(ctx: RawRecordContext, fbParams: FixedBlockPa private def readNextBlock(): Unit = { if (!ctx.inputStream.isEndOfStream) { - val bdwOffset = ctx.inputStream.offset - val blockBuffer = ctx.inputStream.next(bdwSize) + var bdwOffset = ctx.inputStream.offset + + val nextBlockSize = bdwSize.getOrElse({ + val bdw = ctx.inputStream.next(ctx.bdwDecoder.headerSize) + val blockLength = ctx.bdwDecoder.getRecordLength(bdw, bdwOffset) + bdwOffset += ctx.bdwDecoder.headerSize + blockLength + }) + + val blockBuffer = ctx.inputStream.next(nextBlockSize) var blockIndex = 0 while (blockIndex < blockBuffer.length) { - val rdwOffset = bdwOffset + blockIndex - val payload = blockBuffer.slice(blockIndex, blockIndex + recordSize) if (payload.length > 0) { recordQueue.enqueue(payload) diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala index a68694dd3..3b3fdfb1a 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedBlockRawRecordExtractorSuite.scala @@ -99,19 +99,16 @@ class FixedBlockRawRecordExtractorSuite extends WordSpec { assert(extractor.next().head == 0xF8.toByte) assert(!extractor.hasNext) } - } - "failures" should { - "throw an exception when neither block length nor records per block is specified" in { + "allow neither block length nor records per block to be specified" in { val fb = FixedBlockParameters(Some(1), None, None) - val ex = intercept[IllegalArgumentException] { - FixedBlockParameters.validate(fb) - } - - assert(ex.getMessage.contains("FB record format requires block length or number records per block to be specified.")) + assert(fb.blockLength.isEmpty) + assert(fb.recordsPerBlock.isEmpty) } + } + "failures" should { "throw an exception when both block length and records per block are specified" in { val fb = FixedBlockParameters(Some(1), Some(1), Some(1)) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index e6ebdeac7..5f2d2457a 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -325,9 +325,6 @@ object CobolParametersParser { if (bdw.blockLength.nonEmpty && bdw.recordsPerBlock.nonEmpty) { throw new IllegalArgumentException(s"Options '$PARAM_BLOCK_LENGTH' and '$PARAM_RECORDS_PER_BLOCK' cannot be used together.") } - if (recordFormat == FixedBlock && bdw.blockLength.isEmpty && bdw.recordsPerBlock.isEmpty ) { - throw new IllegalArgumentException(s"For FB file format either '$PARAM_BLOCK_LENGTH' or '$PARAM_RECORDS_PER_BLOCK' must be specified.") - } if (recordFormat == VariableBlock && bdw.blockLength.nonEmpty) { logger.warn(s"Option '$PARAM_BLOCK_LENGTH' is ignored for record format: VB") } diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala index 89ad5c7bb..1ab6f0b49 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test30FbFileSpec.scala @@ -54,6 +54,20 @@ class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixtur """[{"A":"0"},{"A":"0"},{"A":"1"},{"A":"1"}]""", ignoreCount = true) } + + "load data with BDWs" in { + testVbRecordLoad(2, 2, + Map[String, String](), + expected4Records, + hasBDW = true) + } + + "load empty data with BDWs" in { + testVbRecordLoad(0, 0, + Map[String, String](), + "[]", + hasBDW = true) + } } "FB record failures should happen" when { @@ -80,23 +94,23 @@ class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixtur assert(ex.getMessage.contains("Options 'block_length' and 'records_per_block' cannot be used together.")) } - - "Mandatory options are missing" in { - val ex = intercept[IllegalArgumentException] { - testVbRecordLoad(1, 2, Map[String, String](), "") - } - - assert(ex.getMessage.contains("For FB file format either 'block_length' or 'records_per_block' must be specified.")) - } } private def testVbRecordLoad(blocks: Int, records: Int, options: Map[String, String], expected: String, - ignoreCount: Boolean = false): Unit = { + ignoreCount: Boolean = false, + hasBDW: Boolean = false): Unit = { val record: Seq[Byte] = Range(0, blocks).flatMap(blockNum => { - Range(0, records).flatMap(recordNum => { + val bdw: Seq[Byte] = if (hasBDW) { + val byte0 = ((records * 2) % 256).toByte + val byte1 = ((records * 2) / 256).toByte + Seq(0, 0, byte0, byte1) + } else { + Nil + } + bdw ++ Range(0, records).flatMap(recordNum => { val idx = (blockNum * records + recordNum) % 10 val v = (0xF0 + idx).toByte Seq(v, v) @@ -104,7 +118,7 @@ class Test30FbFileSpec extends WordSpec with SparkTestBase with BinaryFileFixtur }) withTempBinFile("rec", ".dat", record.toArray) { tmpFileName1 => - val df = spark + val df = spark .read .format("cobol") .option("copybook_contents", copybook)