From 60d270848cb151dc792fdd7bde43e4041c76db5e Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 24 Feb 2026 08:46:40 +0100 Subject: [PATCH 1/3] #822 Allow '_corrupt_records' to extract data in HEX instead of binary data type. --- .../extractors/record/RecordExtractors.scala | 65 +++++++++------- .../iterator/FixedLenNestedRowIterator.scala | 7 +- .../iterator/VarLenNestedIterator.scala | 7 +- .../parameters/CobolParametersParser.scala | 12 ++- .../parameters/CorruptFieldsPolicy.scala | 25 +++++++ .../reader/parameters/ReaderParameters.scala | 4 +- .../cobol/reader/schema/CobolSchema.scala | 8 +- .../builder/SparkCobolOptionsBuilder.scala | 7 +- .../spark/cobol/schema/CobolSchema.scala | 31 ++++++-- .../cobrix/spark/cobol/CobolSchemaSpec.scala | 23 +++++- .../source/base/impl/DummyCobolSchema.scala | 3 +- .../integration/Test41CorruptFieldsSpec.scala | 75 ++++++++++++++++++- 12 files changed, 215 insertions(+), 52 deletions(-) create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CorruptFieldsPolicy.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index 838c0e36..c23ae957 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -35,21 +35,22 @@ object RecordExtractors { /** * This method extracts a record from the specified array of bytes. The copybook for the record needs to be already parsed. * - * @param ast The parsed copybook. - * @param data The data bits containing the record. - * @param offsetBytes The offset to the beginning of the record (in bits). - * @param policy A schema retention policy to be applied to the extracted record. - * @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements. - * @param generateRecordId If true, a record id field will be added as the first field of the record. - * @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record. - * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. - * @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on. - * @param fileId A file id to be put to the extractor record if generateRecordId == true. - * @param recordId The record id to be saved to the record id field. - * @param activeSegmentRedefine An active segment redefine (the one that will be parsed). - * All other segment redefines will be skipped. - * @param generateInputFileField if true, a field containing input file name will be generated - * @param inputFileName An input file name to put if its generation is needed + * @param ast The parsed copybook. + * @param data The data bits containing the record. + * @param offsetBytes The offset to the beginning of the record (in bits). + * @param policy A schema retention policy to be applied to the extracted record. + * @param variableLengthOccurs If true, OCCURS DEPENDING ON data size will depend on the number of elements. + * @param generateRecordId If true, a record id field will be added as the first field of the record. + * @param generateRecordBytes If true, a record bytes field will be added at the beginning of each record. + * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. + * @param generateCorruptFieldsAsHex If true, corrupt fields will be generated as hex strings, otherwise they will be generated as binary data. This parameter is only relevant if generateCorruptFields is true. + * @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on. + * @param fileId A file id to be put to the extractor record if generateRecordId == true. + * @param recordId The record id to be saved to the record id field. + * @param activeSegmentRedefine An active segment redefine (the one that will be parsed). + * All other segment redefines will be skipped. + * @param generateInputFileField if true, a field containing input file name will be generated + * @param inputFileName An input file name to put if its generation is needed * @return An Array[Any] object corresponding to the record schema. */ @throws(classOf[IllegalStateException]) @@ -62,6 +63,7 @@ object RecordExtractors { generateRecordId: Boolean = false, generateRecordBytes: Boolean = false, generateCorruptFields: Boolean = false, + generateCorruptFieldsAsHex: Boolean = false, segmentLevelIds: List[String] = Nil, fileId: Int = 0, recordId: Long = 0, @@ -213,7 +215,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptFields, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptFields, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes, generateCorruptFields, generateCorruptFieldsAsHex, segmentLevelIds, fileId, recordId, data.length, data, generateInputFileField, inputFileName, corruptFields, handler) } /** @@ -433,7 +435,7 @@ object RecordExtractors { policy } - applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptFields = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler) + applyRecordPostProcessing(ast, records.toList, effectiveSchemaRetentionPolicy, generateRecordId, generateRecordBytes = false, generateCorruptFields = false, generateCorruptFieldsAsHex = false, Nil, fileId, recordId, recordLength, Array.empty[Byte], generateInputFileField = generateInputFileField, inputFileName, null, handler) } /** @@ -449,16 +451,18 @@ object RecordExtractors { * Combinations of the listed transformations are supported. *

* - * @param ast The parsed copybook - * @param records The array of [[T]] object for each Group of the copybook - * @param generateRecordId If true a record id field will be added as the first field of the record. - * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record. - * @param generateCorruptFields If true,a corrupt fields field will be appended to the end of the schema. - * @param fileId The file id to be saved to the file id field - * @param recordId The record id to be saved to the record id field - * @param recordByteLength The length of the record - * @param generateInputFileField if true, a field containing input file name will be generated - * @param inputFileName An input file name to put if its generation is needed + * @param ast The parsed copybook + * @param records The array of [[T]] object for each Group of the copybook + * @param generateRecordId If true a record id field will be added as the first field of the record. + * @param generateRecordBytes If true a record bytes field will be added at the beginning of the record. + * @param generateCorruptFields If true,a corrupt fields field will be appended to the end of the schema. + * @param generateCorruptFieldsAsHex If true, corrupt fields will be generated as hex strings, otherwise they will be generated as binary data. This parameter is only relevant if generateCorruptFields is true. + * @param segmentLevelIds Segment ids to put to the extracted record if id generation it turned on. + * @param fileId The file id to be saved to the file id field + * @param recordId The record id to be saved to the record id field + * @param recordByteLength The length of the record + * @param generateInputFileField if true, a field containing input file name will be generated + * @param inputFileName An input file name to put if its generation is needed * @return A [[T]] object corresponding to the record schema */ private def applyRecordPostProcessing[T]( @@ -468,6 +472,7 @@ object RecordExtractors { generateRecordId: Boolean, generateRecordBytes: Boolean, generateCorruptFields: Boolean, + generateCorruptFieldsAsHex: Boolean = false, segmentLevelIds: List[String], fileId: Int, recordId: Long, @@ -515,7 +520,11 @@ object RecordExtractors { val ar = new Array[Any](len) var i = 0 while (i < len) { - val r = handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue), corruptFieldsGroup) + val r = if (generateCorruptFieldsAsHex) { + handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue.map("%02X" format _).mkString), corruptFieldsGroup) + } else { + handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue), corruptFieldsGroup) + } ar(i) = r i += 1 } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala index a6a8fb8d..ec4cda4f 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/FixedLenNestedRowIterator.scala @@ -18,7 +18,7 @@ package za.co.absa.cobrix.cobol.reader.iterator import za.co.absa.cobrix.cobol.internal.Logging import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler} -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.parameters.{CorruptFieldsPolicy, ReaderParameters} import za.co.absa.cobrix.cobol.reader.schema.CobolSchema import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator @@ -47,6 +47,8 @@ class FixedLenNestedRowIterator[T: ClassTag]( private val segmentIdFilter = readerProperties.multisegment.flatMap(_.segmentIdFilter) private val segmentRedefineMap = readerProperties.multisegment.map(_.segmentIdRedefineMap).getOrElse(HashMap[String, String]()) private val segmentRedefineAvailable = segmentRedefineMap.nonEmpty + private val generateCorruptFields = readerProperties.corruptFieldsPolicy != CorruptFieldsPolicy.Disabled + private val generateCorruptFieldsAsHex = readerProperties.corruptFieldsPolicy == CorruptFieldsPolicy.Hex override def hasNext: Boolean = { val correctOffset = if (singleRecordOnly) { @@ -90,7 +92,8 @@ class FixedLenNestedRowIterator[T: ClassTag]( readerProperties.schemaPolicy, readerProperties.variableSizeOccurs, generateRecordBytes = readerProperties.generateRecordBytes, - generateCorruptFields = readerProperties.generateCorruptFields, + generateCorruptFields = generateCorruptFields, + generateCorruptFieldsAsHex = generateCorruptFieldsAsHex, activeSegmentRedefine = activeSegmentRedefine, handler = handler ) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala index 0f9a231e..edc6459a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/iterator/VarLenNestedIterator.scala @@ -20,7 +20,7 @@ import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.headerparsers.RecordHeaderParser import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor import za.co.absa.cobrix.cobol.reader.extractors.record.{RecordExtractors, RecordHandler} -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.parameters.{CorruptFieldsPolicy, ReaderParameters} import za.co.absa.cobrix.cobol.reader.stream.SimpleStream import scala.collection.immutable.HashMap @@ -60,6 +60,8 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook, private val segmentRedefineMap = readerProperties.multisegment.map(_.segmentIdRedefineMap).getOrElse(HashMap[String, String]()) private val segmentRedefineAvailable = segmentRedefineMap.nonEmpty private val generateInputFileName = readerProperties.inputFileNameColumn.nonEmpty + private val generateCorruptFields = readerProperties.corruptFieldsPolicy != CorruptFieldsPolicy.Disabled + private val generateCorruptFieldsAsHex = readerProperties.corruptFieldsPolicy == CorruptFieldsPolicy.Hex fetchNext() @@ -99,7 +101,8 @@ final class VarLenNestedIterator[T: ClassTag](cobolSchema: Copybook, readerProperties.variableSizeOccurs, readerProperties.generateRecordId, readerProperties.generateRecordBytes, - readerProperties.generateCorruptFields, + generateCorruptFields, + generateCorruptFieldsAsHex, segmentLevelIds, fileId, rawRecordIterator.getRecordIndex, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala index 6f9878ec..2f37a1a0 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParametersParser.scala @@ -400,6 +400,16 @@ object CobolParametersParser extends Logging { else None + val corruptFieldsPolicy = if (parameters.generateCorruptFields) { + if (parameters.decodeBinaryAsHex) { + CorruptFieldsPolicy.Hex + } else { + CorruptFieldsPolicy.Binary + } + } else { + CorruptFieldsPolicy.Disabled + } + ReaderParameters( recordFormat = parameters.recordFormat, isEbcdic = parameters.isEbcdic, @@ -433,7 +443,7 @@ object CobolParametersParser extends Logging { fileEndOffset = varLenParams.fileEndOffset, generateRecordId = varLenParams.generateRecordId, generateRecordBytes = parameters.generateRecordBytes, - generateCorruptFields = parameters.generateCorruptFields, + corruptFieldsPolicy = corruptFieldsPolicy, schemaPolicy = parameters.schemaRetentionPolicy, stringTrimmingPolicy = parameters.stringTrimmingPolicy, isDisplayAlwaysString = parameters.isDisplayAlwaysString, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CorruptFieldsPolicy.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CorruptFieldsPolicy.scala new file mode 100644 index 00000000..ea1c3c25 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CorruptFieldsPolicy.scala @@ -0,0 +1,25 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.reader.parameters + +sealed trait CorruptFieldsPolicy + +object CorruptFieldsPolicy { + case object Disabled extends CorruptFieldsPolicy + case object Binary extends CorruptFieldsPolicy + case object Hex extends CorruptFieldsPolicy +} diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala index 4a527e8a..9904fb45 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/ReaderParameters.scala @@ -58,7 +58,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param fileEndOffset A number of bytes to skip at the end of each file * @param generateRecordId If true, a record id field will be prepended to each record. * @param generateRecordBytes Generate 'record_bytes' field containing raw bytes of the original record - * @param generateCorruptFields Generate '_corrupt_fields' field for fields that haven't converted successfully + * @param corruptFieldsPolicy Specifies if '_corrupt_fields' field for fields that haven't converted successfully, and the type of raw values. * @param schemaPolicy Specifies a policy to transform the input schema. The default policy is to keep the schema exactly as it is in the copybook. * @param stringTrimmingPolicy Specifies if and how strings should be trimmed when parsed. * @param isDisplayAlwaysString If true, all fields having DISPLAY format will remain strings and won't be converted to numbers. @@ -112,7 +112,7 @@ case class ReaderParameters( fileEndOffset: Int = 0, generateRecordId: Boolean = false, generateRecordBytes: Boolean = false, - generateCorruptFields: Boolean = false, + corruptFieldsPolicy: CorruptFieldsPolicy = CorruptFieldsPolicy.Disabled, schemaPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot, stringTrimmingPolicy: StringTrimmingPolicy = StringTrimmingPolicy.TrimBoth, isDisplayAlwaysString: Boolean = false, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index 69a025c8..282e92e1 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -20,7 +20,7 @@ import za.co.absa.cobrix.cobol.parser.encoding.codepage.CodePage import za.co.absa.cobrix.cobol.parser.encoding.{ASCII, EBCDIC} import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy import za.co.absa.cobrix.cobol.parser.{Copybook, CopybookParser} -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.parameters.{CorruptFieldsPolicy, ReaderParameters} import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy import java.nio.charset.{Charset, StandardCharsets} @@ -39,7 +39,7 @@ import scala.collection.immutable.HashMap * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. + * @param corruptSchemaPolicy Specifies a policy to handle corrupt records. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fileds' field will be generated. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -52,7 +52,7 @@ class CobolSchema(val copybook: Copybook, val inputFileNameField: String, val generateRecordId: Boolean, val generateRecordBytes: Boolean, - val generateCorruptFields: Boolean, + val corruptSchemaPolicy: CorruptFieldsPolicy, val generateSegIdFieldsCnt: Int = 0, val segmentIdProvidedPrefix: String = "", val metadataPolicy: MetadataPolicy = MetadataPolicy.Basic) extends Serializable { @@ -144,7 +144,7 @@ object CobolSchema { readerParameters.inputFileNameColumn, readerParameters.generateRecordId, readerParameters.generateRecordBytes, - readerParameters.generateCorruptFields, + readerParameters.corruptFieldsPolicy, segIdFieldCount, segmentIdPrefix, readerParameters.metadataPolicy diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala index d827e20b..e0450c23 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/builder/SparkCobolOptionsBuilder.scala @@ -20,7 +20,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.{DataFrame, Row, SparkSession} import za.co.absa.cobrix.cobol.reader.extractors.record.RecordExtractors -import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters +import za.co.absa.cobrix.cobol.reader.parameters.{CorruptFieldsPolicy, ReaderParameters} import za.co.absa.cobrix.cobol.reader.schema.{CobolSchema => CobolReaderSchema} import za.co.absa.cobrix.spark.cobol.reader.RowHandler import za.co.absa.cobrix.spark.cobol.schema.CobolSchema @@ -62,6 +62,8 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes val minimumRecordLength = readerParams.minimumRecordLength val maximumRecordLength = readerParams.maximumRecordLength + val generateCorruptFields = readerParams.corruptFieldsPolicy != CorruptFieldsPolicy.Disabled + val generateCorruptFieldsAsHex = readerParams.corruptFieldsPolicy == CorruptFieldsPolicy.Hex val rddRow = rdd .filter(array => array.nonEmpty && array.length >= minimumRecordLength && array.length <= maximumRecordLength) @@ -73,7 +75,8 @@ class SparkCobolOptionsBuilder(copybookContent: String)(implicit spark: SparkSes variableLengthOccurs = readerParams.variableSizeOccurs, generateRecordId = readerParams.generateRecordId, generateRecordBytes = readerParams.generateRecordBytes, - generateCorruptFields = readerParams.generateCorruptFields, + generateCorruptFields = generateCorruptFields, + generateCorruptFieldsAsHex = generateCorruptFieldsAsHex, handler = recordHandler) Row.fromSeq(record) }) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index ed1b6d60..f4c26331 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -25,7 +25,7 @@ import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.encoding.RAW import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.getReaderProperties -import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, Parameters} +import za.co.absa.cobrix.cobol.reader.parameters.{CobolParametersParser, CorruptFieldsPolicy, Parameters} import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.schema.{CobolSchema => CobolReaderSchema} @@ -44,7 +44,7 @@ import scala.collection.mutable.ArrayBuffer * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param generateCorruptFields If true, a corrupt fields field will be appended to the end of the schema. + * @param corruptFieldsPolicy Specifies a policy to handle corrupt records. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fileds' field will be generated. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) @@ -57,7 +57,7 @@ class CobolSchema(copybook: Copybook, inputFileNameField: String, generateRecordId: Boolean, generateRecordBytes: Boolean, - generateCorruptFields: Boolean, + corruptFieldsPolicy: CorruptFieldsPolicy, generateSegIdFieldsCnt: Int, segmentIdProvidedPrefix: String, metadataPolicy: MetadataPolicy) @@ -68,7 +68,7 @@ class CobolSchema(copybook: Copybook, inputFileNameField, generateRecordId, generateRecordBytes, - generateCorruptFields, + corruptFieldsPolicy, generateSegIdFieldsCnt, segmentIdProvidedPrefix ) with Logging with Serializable { @@ -82,6 +82,7 @@ class CobolSchema(copybook: Copybook, @throws(classOf[IllegalStateException]) private def createSparkSchema(): StructType = { + val generateCorruptFields = corruptFieldsPolicy != CorruptFieldsPolicy.Disabled val records = for (record <- copybook.getRootRecords) yield { val group = record.asInstanceOf[Group] val redefines = copybook.getAllSegmentRedefines @@ -130,10 +131,15 @@ class CobolSchema(copybook: Copybook, } val recordsWithCorruptFields = if (generateCorruptFields) { + val rawFieldType = if (corruptFieldsPolicy == CorruptFieldsPolicy.Hex) { + StringType + } else { + BinaryType + } recordsWithRecordId :+ StructField(Constants.corruptFieldsField, ArrayType(StructType( Seq( StructField(Constants.fieldNameColumn, StringType, nullable = false), - StructField(Constants.rawValueColumn, BinaryType, nullable = false) + StructField(Constants.rawValueColumn, rawFieldType, nullable = false) ) ), containsNull = false), nullable = true) } else { @@ -323,7 +329,7 @@ object CobolSchema { schema.inputFileNameField, schema.generateRecordId, schema.generateRecordBytes, - schema.generateCorruptFields, + schema.corruptSchemaPolicy, schema.generateSegIdFieldsCnt, schema.segmentIdPrefix, schema.metadataPolicy @@ -343,6 +349,7 @@ object CobolSchema { class CobolSchemaBuilder(copybook: Copybook) { private var schemaRetentionPolicy: SchemaRetentionPolicy = SchemaRetentionPolicy.CollapseRoot private var isDisplayAlwaysString: Boolean = false + private var decodeBinaryAsHex: Boolean = false private var strictIntegralPrecision: Boolean = false private var inputFileNameField: String = "" private var generateRecordId: Boolean = false @@ -362,6 +369,11 @@ object CobolSchema { this } + def withDecodeBinaryAsHex(decodeBinaryAsHex: Boolean): CobolSchemaBuilder = { + this.decodeBinaryAsHex = decodeBinaryAsHex + this + } + def withStrictIntegralPrecision(strictIntegralPrecision: Boolean): CobolSchemaBuilder = { this.strictIntegralPrecision = strictIntegralPrecision this @@ -403,6 +415,11 @@ object CobolSchema { } def build(): CobolSchema = { + val corruptFieldsPolicy = if (generateCorruptFields) { + if (decodeBinaryAsHex) CorruptFieldsPolicy.Hex else CorruptFieldsPolicy.Binary + } else { + CorruptFieldsPolicy.Disabled + } new CobolSchema( copybook, schemaRetentionPolicy, @@ -411,7 +428,7 @@ object CobolSchema { inputFileNameField, generateRecordId, generateRecordBytes, - generateCorruptFields, + corruptFieldsPolicy, generateSegIdFieldsCnt, segmentIdProvidedPrefix, metadataPolicy diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala index 92a5744d..59fc9757 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/CobolSchemaSpec.scala @@ -869,7 +869,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assert(sparkSchema.fields(1).dataType.isInstanceOf[StructType]) } - "create schema with corrupt fields using builder" in { + "create schema with corrupt fields using builder with hex" in { val copybook: String = """ 01 RECORD. | 05 STR1 PIC X(10). @@ -877,6 +877,7 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { val parsedCopybook = CopybookParser.parse(copybook) val cobolSchema = CobolSchema.builder(parsedCopybook) .withGenerateCorruptFields(true) + .withDecodeBinaryAsHex(true) .build() val sparkSchema = cobolSchema.getSparkSchema @@ -884,6 +885,26 @@ class CobolSchemaSpec extends AnyWordSpec with SimpleComparisonBase { assert(sparkSchema.fields.length == 2) assert(sparkSchema.fields(1).name == "_corrupt_fields") assert(sparkSchema.fields(1).dataType.isInstanceOf[ArrayType]) + assert(sparkSchema.fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(1).dataType == StringType) + } + + "create schema with corrupt fields using builder with binary" in { + val copybook: String = + """ 01 RECORD. + | 05 STR1 PIC X(10). + |""".stripMargin + val parsedCopybook = CopybookParser.parse(copybook) + val cobolSchema = CobolSchema.builder(parsedCopybook) + .withGenerateCorruptFields(true) + .withDecodeBinaryAsHex(false) + .build() + + val sparkSchema = cobolSchema.getSparkSchema + + assert(sparkSchema.fields.length == 2) + assert(sparkSchema.fields(1).name == "_corrupt_fields") + assert(sparkSchema.fields(1).dataType.isInstanceOf[ArrayType]) + assert(sparkSchema.fields(1).dataType.asInstanceOf[ArrayType].elementType.asInstanceOf[StructType].fields(1).dataType == BinaryType) } "create schema with various options" in { diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala index 790420ca..b5def863 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/base/impl/DummyCobolSchema.scala @@ -20,6 +20,7 @@ import org.apache.spark.sql.types.StructType import za.co.absa.cobrix.cobol.parser.Copybook import za.co.absa.cobrix.cobol.parser.ast.Group import za.co.absa.cobrix.cobol.parser.policies.MetadataPolicy +import za.co.absa.cobrix.cobol.reader.parameters.CorruptFieldsPolicy.Disabled import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.spark.cobol.schema.CobolSchema @@ -31,7 +32,7 @@ class DummyCobolSchema(val sparkSchema: StructType) extends CobolSchema( "", false, false, - false, + Disabled, 0, "", MetadataPolicy.Basic) with Serializable { diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala index a82476ea..cbbc934b 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/source/integration/Test41CorruptFieldsSpec.scala @@ -18,7 +18,7 @@ package za.co.absa.cobrix.spark.cobol.source.integration import org.apache.spark.sql.DataFrame import org.scalatest.wordspec.AnyWordSpec -import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.PARAM_CORRUPT_FIELDS +import za.co.absa.cobrix.cobol.reader.parameters.CobolParametersParser.{PARAM_BINARY_AS_HEX, PARAM_CORRUPT_FIELDS} import za.co.absa.cobrix.spark.cobol.source.base.SparkTestBase import za.co.absa.cobrix.spark.cobol.source.fixtures.{BinaryFileFixture, TextComparisonFixture} import za.co.absa.cobrix.spark.cobol.utils.SparkUtils @@ -42,7 +42,7 @@ class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with Binary ).map(_.toByte) "Corrupt fields record generation" should { - "work when the option is turned on" in { + "work when the option is turned on as binary" in { val expectedSchema = """root | |-- ID: integer (nullable = true) @@ -113,6 +113,77 @@ class Test41CorruptFieldsSpec extends AnyWordSpec with SparkTestBase with Binary } } + "work when the option is turned on as hex" in { + val expectedSchema = + """root + | |-- ID: integer (nullable = true) + | |-- F1: string (nullable = true) + | |-- F2: integer (nullable = true) + | |-- F3: integer (nullable = true) + | |-- F4: array (nullable = true) + | | |-- element: integer (containsNull = true) + | |-- _corrupt_fields: array (nullable = true) + | | |-- element: struct (containsNull = false) + | | | |-- field_name: string (nullable = false) + | | | |-- raw_value: string (nullable = false) + |""".stripMargin + + val expectedData = + """[ { + | "ID" : 1, + | "F1" : "", + | "F2" : 5, + | "F3" : 6, + | "F4" : [ 1, 2, 3 ], + | "_corrupt_fields" : [ ] + |}, { + | "ID" : 2, + | "F1" : "1", + | "F3" : 5, + | "F4" : [ 4, 5, 6 ], + | "_corrupt_fields" : [ { + | "field_name" : "F2", + | "raw_value" : "D3" + | } ] + |}, { + | "ID" : 3, + | "F2" : 3, + | "F3" : 61702, + | "F4" : [ 7, 8, 9 ], + | "_corrupt_fields" : [ ] + |}, { + | "ID" : 4, + | "F3" : 0, + | "F4" : [ null, null, 0 ], + | "_corrupt_fields" : [ ] + |}, { + | "ID" : 5, + | "F1" : "A", + | "F2" : 4, + | "F3" : 160, + | "F4" : [ null, 5, null ], + | "_corrupt_fields" : [ { + | "field_name" : "F4[0]", + | "raw_value" : "C1" + | }, { + | "field_name" : "F4[2]", + | "raw_value" : "A3" + | } ] + |} ] + |""".stripMargin + + withTempBinFile("corrupt_fields1", ".dat", data) { tmpFileName => + val df = getDataFrame(tmpFileName, Map(PARAM_CORRUPT_FIELDS -> "true", PARAM_BINARY_AS_HEX -> "true")) + + val actualSchema = df.schema.treeString + compareTextVertical(actualSchema, expectedSchema) + + val actualData = SparkUtils.convertDataFrameToPrettyJSON(df.orderBy("ID"), 10) + + compareTextVertical(actualData, expectedData) + } + } + "throw an exception when working with a hierarchical data" in { val ex = intercept[IllegalArgumentException] { getDataFrame("/tmp/dummy", Map(PARAM_CORRUPT_FIELDS -> "true", "segment-children:0" -> "COMPANY => DEPT,CUSTOMER")) From 865359ec555e6b1806407f8d4feb409eea8d9404 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 24 Feb 2026 10:52:50 +0100 Subject: [PATCH 2/3] #822 Fix PR suggestions and made Bin to HEX conversion method more performant --- .../extractors/record/RecordExtractors.scala | 17 ++++++++++++++++- .../cobol/reader/schema/CobolSchema.scala | 2 +- .../cobrix/spark/cobol/schema/CobolSchema.scala | 2 +- .../cobrix/spark/cobol/RowExtractorSpec.scala | 7 +++++++ 4 files changed, 25 insertions(+), 3 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index c23ae957..ed782659 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -521,7 +521,8 @@ object RecordExtractors { var i = 0 while (i < len) { val r = if (generateCorruptFieldsAsHex) { - handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue.map("%02X" format _).mkString), corruptFieldsGroup) + val hex = convertArrayToHex(corruptFields(i).rawValue) + handler.create(Array[Any](corruptFields(i).fieldName, hex), corruptFieldsGroup) } else { handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue), corruptFieldsGroup) } @@ -534,6 +535,20 @@ object RecordExtractors { // toList() is a constant time operation, and List implements immutable Seq, which is exactly what is needed here. outputRecords.toList } + + private[cobrix] def convertArrayToHex(a: Array[Byte]): String = { + if (a == null) return "" + val hexChars = new Array[Char](a.length * 2) + val hexArray = Array('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F') + var i = 0 + while (i < a.length) { + val v = a(i) & 0xFF + hexChars(i * 2) = hexArray(v >>> 4) + hexChars(i * 2 + 1) = hexArray(v & 0x0F) + i += 1 + } + new String(hexChars) + } /** * Constructs a Group object representing corrupt fields. It is only needed for constructing records that require field names, diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index 282e92e1..1fe4a8cc 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -39,7 +39,7 @@ import scala.collection.immutable.HashMap * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param corruptSchemaPolicy Specifies a policy to handle corrupt records. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fileds' field will be generated. + * @param corruptSchemaPolicy Specifies a policy to handle corrupt fields. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fields' field will be generated. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index f4c26331..42dace51 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -44,7 +44,7 @@ import scala.collection.mutable.ArrayBuffer * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param corruptFieldsPolicy Specifies a policy to handle corrupt records. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fileds' field will be generated. + * @param corruptFieldsPolicy Specifies a policy to handle corrupt fields. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fields' field will be generated. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala index ec530bad..0c72258e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala @@ -141,4 +141,11 @@ class RowExtractorSpec extends AnyFunSuite { //account type assert(account(1).asInstanceOf[Int] === 0) } + + test("Byte array to HEX conversion") { + assert(RecordExtractors.convertArrayToHex(null) == "") + assert(RecordExtractors.convertArrayToHex(Array.empty) == "") + assert(RecordExtractors.convertArrayToHex(Array(1)) == "01") + assert(RecordExtractors.convertArrayToHex(bytes) == "0006C5E7C1D4D7D3C5F4404000000F40404040404040404040404040404040404040404040404040003FF0F0F0F0F0F0F0F0F0F0F0F0F0F0F2F0F0F0F4F0F0F0F1F200000FF0F0F0F0F0F0F0F0F0F0F0F0F0F0F3F0F0F0F4F0F0F1F0F200001FF0F0F0F0F0F0F0F0F5F0F0F6F0F0F1F2F0F0F3F0F1F0F0F000002F") + } } From 9af73e82b52c55a7ab37f3c139bef4f76693be24 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 24 Feb 2026 11:12:21 +0100 Subject: [PATCH 3/3] #822 Extract string to HEX conversion into a utility object ensuring high performance implementation across its usages. --- .../asttransform/DebugFieldsAdder.scala | 4 +- .../parser/decoders/DecoderSelector.scala | 3 +- .../parser/decoders/StringDecoders.scala | 21 --------- ...thRecordLengthExprRawRecordExtractor.scala | 9 ++-- .../extractors/record/RecordExtractors.scala | 17 +------ .../cobol/reader/schema/CobolSchema.scala | 2 +- .../absa/cobrix/cobol/utils/StringUtils.scala | 44 +++++++++++++++++++ .../parser/decoders/StringDecodersSpec.scala | 8 ---- .../cobrix/cobol/utils/StringUtilsSuite.scala | 38 ++++++++++++++++ .../spark/cobol/schema/CobolSchema.scala | 2 +- .../cobrix/spark/cobol/RowExtractorSpec.scala | 7 --- 11 files changed, 93 insertions(+), 62 deletions(-) create mode 100644 cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/utils/StringUtils.scala create mode 100644 cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/utils/StringUtilsSuite.scala diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/asttransform/DebugFieldsAdder.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/asttransform/DebugFieldsAdder.scala index 5bfd7477..60815e94 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/asttransform/DebugFieldsAdder.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/asttransform/DebugFieldsAdder.scala @@ -20,10 +20,10 @@ import za.co.absa.cobrix.cobol.parser.CopybookParser.CopybookAST import za.co.absa.cobrix.cobol.parser.ast.datatype.AlphaNumeric import za.co.absa.cobrix.cobol.parser.ast.{Group, Primitive, Statement} import za.co.absa.cobrix.cobol.parser.decoders.StringDecoders -import za.co.absa.cobrix.cobol.parser.decoders.StringDecoders.KeepAll import za.co.absa.cobrix.cobol.parser.encoding._ import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy import za.co.absa.cobrix.cobol.parser.policies.DebugFieldsPolicy.DebugFieldsPolicy +import za.co.absa.cobrix.cobol.utils.StringUtils import scala.collection.mutable.ArrayBuffer @@ -47,7 +47,7 @@ class DebugFieldsAdder(debugFieldsPolicy: DebugFieldsPolicy) extends AstTransfor } val debugDecoder = debugFieldsPolicy match { - case DebugFieldsPolicy.HexValue => StringDecoders.decodeHex _ + case DebugFieldsPolicy.HexValue => StringUtils.convertArrayToHex _ case DebugFieldsPolicy.RawValue => StringDecoders.decodeRaw _ case DebugFieldsPolicy.StringValue => (a: Array[Byte]) => new String(a) case _ => throw new IllegalStateException(s"Unexpected debug fields policy: $debugFieldsPolicy.") diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/DecoderSelector.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/DecoderSelector.scala index b8a4c30e..5d7d2584 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/DecoderSelector.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/DecoderSelector.scala @@ -23,6 +23,7 @@ import za.co.absa.cobrix.cobol.parser.decoders.FloatingPointFormat.FloatingPoint import za.co.absa.cobrix.cobol.parser.encoding._ import za.co.absa.cobrix.cobol.parser.encoding.codepage.{CodePage, CodePageCommon} import za.co.absa.cobrix.cobol.parser.position.Position +import za.co.absa.cobrix.cobol.utils.StringUtils import java.nio.charset.{Charset, StandardCharsets} import scala.util.control.NonFatal @@ -94,7 +95,7 @@ object DecoderSelector { case UTF16 => StringDecoders.decodeUtf16String(_, getStringStrimmingType(stringTrimmingPolicy), isUtf16BigEndian, improvedNullDetection) case HEX => - StringDecoders.decodeHex + StringUtils.convertArrayToHex case RAW => StringDecoders.decodeRaw } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecoders.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecoders.scala index 8ca8e651..dcc40710 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecoders.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecoders.scala @@ -33,9 +33,6 @@ object StringDecoders { val TrimBoth = 4 val KeepAll = 5 - // Characters used for HEX conversion - private val HEX_ARRAY = "0123456789ABCDEF".toCharArray - /** * A decoder for any EBCDIC string fields (alphabetical or any char) * @@ -125,24 +122,6 @@ object StringDecoders { } } - /** - * A decoder for representing bytes as hex strings - * - * @param bytes A byte array that represents the binary data - * @return A HEX string representation of the binary data - */ - final def decodeHex(bytes: Array[Byte]): String = { - val hexChars = new Array[Char](bytes.length * 2) - var i = 0 - while (i < bytes.length) { - val v = bytes(i) & 0xFF - hexChars(i * 2) = HEX_ARRAY(v >>> 4) - hexChars(i * 2 + 1) = HEX_ARRAY(v & 0x0F) - i += 1 - } - new String(hexChars) - } - /** * A decoder that doesn't decode, but just passes the bytes the way they are. * diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala index fc2a4026..df54983c 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/raw/FixedWithRecordLengthExprRawRecordExtractor.scala @@ -21,6 +21,7 @@ import za.co.absa.cobrix.cobol.parser.ast.Primitive import za.co.absa.cobrix.cobol.reader.iterator.RecordLengthExpression import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters import za.co.absa.cobrix.cobol.reader.validator.ReaderParametersValidator +import za.co.absa.cobrix.cobol.utils.StringUtils import scala.util.Try @@ -127,7 +128,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, case l: Long => l.toInt case s: String => Try{ s.toInt }.getOrElse(throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type, encountered: '$s'.")) case d: BigDecimal => d.toInt - case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).") + case null => throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${StringUtils.convertArrayToHex(binaryDataStart)}).") case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") } } else { @@ -136,7 +137,7 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, case l: Long => getRecordLengthFromMapping(l.toString) case d: BigDecimal => getRecordLengthFromMapping(d.toString()) case s: String => getRecordLengthFromMapping(s) - case null => defaultRecordLength.getOrElse(throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${getBytesAsHexString(binaryDataStart)}).")) + case null => defaultRecordLength.getOrElse(throw new IllegalStateException(s"Null encountered as a record length field (offset: $byteIndex, raw value: ${StringUtils.convertArrayToHex(binaryDataStart)}).")) case _ => throw new IllegalStateException(s"Record length value of the field ${lengthAST.name} must be an integral type.") } } @@ -150,10 +151,6 @@ class FixedWithRecordLengthExprRawRecordExtractor(ctx: RawRecordContext, } } - final private def getBytesAsHexString(bytes: Array[Byte]): String = { - bytes.map("%02X" format _).mkString - } - private def fetchRecordUsingRecordLengthFieldExpression(expr: RecordLengthExpression): Option[Array[Byte]] = { val lengthFieldBlock = expr.requiredBytesToread val evaluator = expr.evaluator diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala index ed782659..10b5f20a 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/extractors/record/RecordExtractors.scala @@ -23,6 +23,7 @@ import za.co.absa.cobrix.cobol.parser.common.Constants import za.co.absa.cobrix.cobol.parser.encoding.RAW import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaRetentionPolicy +import za.co.absa.cobrix.cobol.utils.StringUtils import scala.collection.mutable import scala.collection.mutable.{ArrayBuffer, ListBuffer} @@ -521,7 +522,7 @@ object RecordExtractors { var i = 0 while (i < len) { val r = if (generateCorruptFieldsAsHex) { - val hex = convertArrayToHex(corruptFields(i).rawValue) + val hex = StringUtils.convertArrayToHex(corruptFields(i).rawValue) handler.create(Array[Any](corruptFields(i).fieldName, hex), corruptFieldsGroup) } else { handler.create(Array[Any](corruptFields(i).fieldName, corruptFields(i).rawValue), corruptFieldsGroup) @@ -536,20 +537,6 @@ object RecordExtractors { outputRecords.toList } - private[cobrix] def convertArrayToHex(a: Array[Byte]): String = { - if (a == null) return "" - val hexChars = new Array[Char](a.length * 2) - val hexArray = Array('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F') - var i = 0 - while (i < a.length) { - val v = a(i) & 0xFF - hexChars(i * 2) = hexArray(v >>> 4) - hexChars(i * 2 + 1) = hexArray(v & 0x0F) - i += 1 - } - new String(hexChars) - } - /** * Constructs a Group object representing corrupt fields. It is only needed for constructing records that require field names, * such as JSON. Field sizes and encoding do not really matter diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala index 1fe4a8cc..b6e9e00e 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/schema/CobolSchema.scala @@ -39,7 +39,7 @@ import scala.collection.immutable.HashMap * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param corruptSchemaPolicy Specifies a policy to handle corrupt fields. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fields' field will be generated. + * @param corruptSchemaPolicy Specifies a policy to handle corrupt fields. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrupt_fields' field will be generated. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/utils/StringUtils.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/utils/StringUtils.scala new file mode 100644 index 00000000..154fa187 --- /dev/null +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/utils/StringUtils.scala @@ -0,0 +1,44 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.utils + +object StringUtils { + // Characters used for HEX conversion + private final val HEX_ARRAY = Array('0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'A', 'B', 'C', 'D', 'E', 'F') + + /** + * Converts an array of bytes into a hexadecimal string representation. + * + * The main goal is the high CPU and memory efficiency of this method. + * + * @param a the input array of bytes to be converted. If the input is null, the method returns null. + * @return a string representing the hexadecimal equivalent of the input byte array, or null if the input is null. + */ + def convertArrayToHex(a: Array[Byte]): String = { + if (a == null) return null + val hexArray = HEX_ARRAY + val hexChars = new Array[Char](a.length * 2) + var i = 0 + while (i < a.length) { + val v = a(i) & 0xFF + hexChars(i * 2) = hexArray(v >>> 4) + hexChars(i * 2 + 1) = hexArray(v & 0x0F) + i += 1 + } + new String(hexChars) + } +} diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala index cc525e72..fc94eb23 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/parser/decoders/StringDecodersSpec.scala @@ -608,14 +608,6 @@ class StringDecodersSpec extends AnyWordSpec { } } - "decodeHex()" should { - "decode bytes as HEX strings" in { - val hex = decodeHex(Array[Byte](0, 3, 16, 127, -1, -127)) - - assert(hex == "0003107FFF81") - } - } - "decodeRaw()" should { "should work on empty arrays" in { val data = Array.empty[Byte] diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/utils/StringUtilsSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/utils/StringUtilsSuite.scala new file mode 100644 index 00000000..30a20d43 --- /dev/null +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/utils/StringUtilsSuite.scala @@ -0,0 +1,38 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.cobrix.cobol.utils + +import org.scalatest.wordspec.AnyWordSpec + +class StringUtilsSuite extends AnyWordSpec { + "StringUtils" should { + "convert byte array to hex string correctly" in { + val byteArray = Array[Byte](0x00, 0x01, 0x02, 0x0A, 0x0F, 0x10, 0x1F, 0x7F, 0x80.toByte, 0xFF.toByte) + val expectedHexString = "0001020A0F101F7F80FF" + assert(StringUtils.convertArrayToHex(byteArray) == expectedHexString) + } + + "return empty string when input is an empty array" in { + assert(StringUtils.convertArrayToHex(Array.empty) == "") + } + + "return null when input is null" in { + assert(StringUtils.convertArrayToHex(null) == null) + } + } + +} diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index 42dace51..238cc88f 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -44,7 +44,7 @@ import scala.collection.mutable.ArrayBuffer * @param strictIntegralPrecision If true, Cobrix will not generate short/integer/long Spark data types, and always use decimal(n) with the exact precision that matches the copybook. * @param generateRecordId If true, a record id field will be prepended to the beginning of the schema. * @param generateRecordBytes If true, a record bytes field will be appended to the beginning of the schema. - * @param corruptFieldsPolicy Specifies a policy to handle corrupt fields. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrput_fields' field will be generated. + * @param corruptFieldsPolicy Specifies a policy to handle corrupt fields. By default, null values will be produced and the original value is ignored. If the policy is set the '_corrupt_fields' field will be generated. * @param inputFileNameField If non-empty, a source file name will be prepended to the beginning of the schema. * @param generateSegIdFieldsCnt A number of segment ID levels to generate * @param segmentIdProvidedPrefix A prefix for each segment id levels to make segment ids globally unique (by default the current timestamp will be used) diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala index 0c72258e..ec530bad 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/RowExtractorSpec.scala @@ -141,11 +141,4 @@ class RowExtractorSpec extends AnyFunSuite { //account type assert(account(1).asInstanceOf[Int] === 0) } - - test("Byte array to HEX conversion") { - assert(RecordExtractors.convertArrayToHex(null) == "") - assert(RecordExtractors.convertArrayToHex(Array.empty) == "") - assert(RecordExtractors.convertArrayToHex(Array(1)) == "01") - assert(RecordExtractors.convertArrayToHex(bytes) == "0006C5E7C1D4D7D3C5F4404000000F40404040404040404040404040404040404040404040404040003FF0F0F0F0F0F0F0F0F0F0F0F0F0F0F2F0F0F0F4F0F0F0F1F200000FF0F0F0F0F0F0F0F0F0F0F0F0F0F0F3F0F0F0F4F0F0F1F0F200001FF0F0F0F0F0F0F0F0F5F0F0F6F0F0F1F2F0F0F3F0F1F0F0F000002F") - } }