diff --git a/README.md b/README.md index 490d9f56e..4318c8bf5 100644 --- a/README.md +++ b/README.md @@ -1612,6 +1612,7 @@ The output looks like this: | Option (usage example) | Description | |----------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | .option("pedantic", "false") | If 'true' Cobrix will throw an exception is an unknown option is encountered. If 'false' (default), unknown options will be logged as an error without failing Spark Application. | +| .option("debug_layout_positions", "true") | If 'true' Cobrix will generate and log layout positions table when reading data. | | .option("debug_ignore_file_size", "true") | If 'true' no exception will be thrown if record size does not match file size. Useful for debugging copybooks to make them match a data file. | | .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. | | .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add mo than 1 such option for multiple code page overrides. | diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala index e9efdcc2c..8a241dd5e 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/parameters/CobolParameters.scala @@ -60,6 +60,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten * @param nonTerminals A list of non-terminals (GROUPS) to combine and parse as primitive fields * @param debugFieldsPolicy Specifies if debugging fields need to be added and what should they contain (false, hex, raw). * @param debugIgnoreFileSize If true the fixed length file reader won't check file size divisibility. Useful for debugging binary file / copybook mismatches. + * @param debugLayoutPositions If true, layout positions for input files will be logged (false by default) * @param metadataPolicy Specifies the policy of metadat fields to be added to the Spark schema */ case class CobolParameters( @@ -100,5 +101,6 @@ case class CobolParameters( occursMappings: Map[String, Map[String, Int]], debugFieldsPolicy: DebugFieldsPolicy, debugIgnoreFileSize: Boolean, + debugLayoutPositions: Boolean, metadataPolicy: MetadataPolicy ) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala index f80425829..70e7b86f6 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/parameters/CobolParametersParser.scala @@ -126,6 +126,7 @@ object CobolParametersParser extends Logging { val PARAM_IMPROVE_LOCALITY = "improve_locality" // Parameters for debugging + val PARAM_DEBUG_LAYOUT_POSITIONS = "debug_layout_positions" val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size" private def getSchemaRetentionPolicy(params: Parameters): SchemaRetentionPolicy = { @@ -282,6 +283,7 @@ object CobolParametersParser extends Logging { getOccursMappings(params.getOrElse(PARAM_OCCURS_MAPPINGS, "{}")), getDebuggingFieldsPolicy(recordFormat, params), params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean, + params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean, MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic")) ) validateSparkCobolOptions(params, recordFormat) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala index 8af8daa21..681a9badc 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/schema/CobolSchema.scala @@ -68,7 +68,6 @@ class CobolSchema(copybook: Copybook, @throws(classOf[IllegalStateException]) private[this] lazy val sparkFlatSchema = { - logger.info("Layout positions:\n" + copybook.generateRecordLayoutPositions()) val arraySchema = copybook.ast.children.toArray val records = arraySchema.flatMap(record => { parseGroupFlat(record.asInstanceOf[Group], s"${record.name}_") @@ -86,8 +85,6 @@ class CobolSchema(copybook: Copybook, @throws(classOf[IllegalStateException]) private def createSparkSchema(): StructType = { - logger.info("Layout positions:\n" + copybook.generateRecordLayoutPositions()) - val records = for (record <- copybook.getRootRecords) yield { val group = record.asInstanceOf[Group] val redefines = copybook.getAllSegmentRedefines diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala index 5994763bf..71c22d61b 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/DefaultSource.scala @@ -16,6 +16,7 @@ package za.co.absa.cobrix.spark.cobol.source +import org.apache.hadoop.fs.Path import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, SchemaRelationProvider} import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{SQLContext, SparkSession} @@ -67,7 +68,7 @@ class DefaultSource * This method will probably be removed once the correct hierarchy for [[FixedLenReader]] is put in place. */ private def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters): Reader = { - if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) { + val reader = if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) { createTextReader(cobolParameters, spark) } else if (cobolParameters.variableLengthParams.isEmpty) { createFixedLengthReader(cobolParameters, spark) @@ -75,6 +76,10 @@ class DefaultSource else { createVariableLengthReader(cobolParameters, spark) } + + if (cobolParameters.debugLayoutPositions) + logger.info(s"Layout positions:\n${reader.getCobolSchema.copybook.generateRecordLayoutPositions()}") + reader } /** @@ -82,7 +87,7 @@ class DefaultSource */ private def createTextReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = { val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration) - val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark) + val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption) new FixedLenTextReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize) ) } @@ -93,7 +98,7 @@ class DefaultSource private def createFixedLengthReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = { val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration) - val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark) + val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption) new FixedLenNestedReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize) ) } @@ -107,7 +112,7 @@ class DefaultSource val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration) - val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark) + val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption) new VarLenNestedReader( copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize) ) diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala index 2d501db70..dd7efdbcd 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/utils/SparkUtils.scala @@ -17,7 +17,7 @@ package za.co.absa.cobrix.spark.cobol.utils import com.fasterxml.jackson.databind.ObjectMapper -import org.apache.hadoop.fs.FileSystem +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.sql.functions.{array, col, expr, max, struct} import za.co.absa.cobrix.spark.cobol.utils.impl.HofsWrapper.transform @@ -464,13 +464,18 @@ object SparkUtils extends Logging { }.getOrElse(None) } - def getDefaultHdfsBlockSize(spark: SparkSession): Option[Int] = { + def getDefaultHdfsBlockSize(spark: SparkSession, pathOpt: Option[String]): Option[Int] = { val conf = spark.sparkContext.hadoopConfiguration - val fileSystem = FileSystem.get(conf) + + val fileSystem =pathOpt match { + case Some(pathStr) => new Path(pathStr).getFileSystem(conf) + case None => FileSystem.get(conf) + } + val hdfsBlockSize = HDFSUtils.getHDFSDefaultBlockSizeMB(fileSystem) hdfsBlockSize match { - case None => logger.info(s"Unable to get HDFS default block size.") - case Some(size) => logger.info(s"HDFS default block size = $size MB.") + case None => logger.info(s"Unable to get default block size for '${fileSystem.getScheme}://.") + case Some(size) => logger.info(s"Default block size for '${fileSystem.getScheme}://' is $size MB.") } hdfsBlockSize