Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1612,6 +1612,7 @@ The output looks like this:
| Option (usage example) | Description |
|----------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| .option("pedantic", "false") | If 'true' Cobrix will throw an exception is an unknown option is encountered. If 'false' (default), unknown options will be logged as an error without failing Spark Application. |
| .option("debug_layout_positions", "true") | If 'true' Cobrix will generate and log layout positions table when reading data. |
| .option("debug_ignore_file_size", "true") | If 'true' no exception will be thrown if record size does not match file size. Useful for debugging copybooks to make them match a data file. |
| .option("ascii_charset", "US-ASCII") | Specifies a charset to use to decode ASCII data. The value can be any charset supported by `java.nio.charset`: `US-ASCII` (default), `UTF-8`, `ISO-8859-1`, etc. |
| .option("field_code_page:cp825", "field1, field2") | Specifies the code page for selected fields. You can add mo than 1 such option for multiple code page overrides. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ import za.co.absa.cobrix.cobol.reader.policies.SchemaRetentionPolicy.SchemaReten
* @param nonTerminals A list of non-terminals (GROUPS) to combine and parse as primitive fields
* @param debugFieldsPolicy Specifies if debugging fields need to be added and what should they contain (false, hex, raw).
* @param debugIgnoreFileSize If true the fixed length file reader won't check file size divisibility. Useful for debugging binary file / copybook mismatches.
* @param debugLayoutPositions If true, layout positions for input files will be logged (false by default)
* @param metadataPolicy Specifies the policy of metadat fields to be added to the Spark schema
*/
case class CobolParameters(
Expand Down Expand Up @@ -100,5 +101,6 @@ case class CobolParameters(
occursMappings: Map[String, Map[String, Int]],
debugFieldsPolicy: DebugFieldsPolicy,
debugIgnoreFileSize: Boolean,
debugLayoutPositions: Boolean,
metadataPolicy: MetadataPolicy
)
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ object CobolParametersParser extends Logging {
val PARAM_IMPROVE_LOCALITY = "improve_locality"

// Parameters for debugging
val PARAM_DEBUG_LAYOUT_POSITIONS = "debug_layout_positions"
val PARAM_DEBUG_IGNORE_FILE_SIZE = "debug_ignore_file_size"

private def getSchemaRetentionPolicy(params: Parameters): SchemaRetentionPolicy = {
Expand Down Expand Up @@ -282,6 +283,7 @@ object CobolParametersParser extends Logging {
getOccursMappings(params.getOrElse(PARAM_OCCURS_MAPPINGS, "{}")),
getDebuggingFieldsPolicy(recordFormat, params),
params.getOrElse(PARAM_DEBUG_IGNORE_FILE_SIZE, "false").toBoolean,
params.getOrElse(PARAM_DEBUG_LAYOUT_POSITIONS, "false").toBoolean,
MetadataPolicy(params.getOrElse(PARAM_METADATA, "basic"))
)
validateSparkCobolOptions(params, recordFormat)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ class CobolSchema(copybook: Copybook,

@throws(classOf[IllegalStateException])
private[this] lazy val sparkFlatSchema = {
logger.info("Layout positions:\n" + copybook.generateRecordLayoutPositions())
val arraySchema = copybook.ast.children.toArray
val records = arraySchema.flatMap(record => {
parseGroupFlat(record.asInstanceOf[Group], s"${record.name}_")
Expand All @@ -86,8 +85,6 @@ class CobolSchema(copybook: Copybook,

@throws(classOf[IllegalStateException])
private def createSparkSchema(): StructType = {
logger.info("Layout positions:\n" + copybook.generateRecordLayoutPositions())

val records = for (record <- copybook.getRootRecords) yield {
val group = record.asInstanceOf[Group]
val redefines = copybook.getAllSegmentRedefines
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package za.co.absa.cobrix.spark.cobol.source

import org.apache.hadoop.fs.Path
import org.apache.spark.sql.sources.{BaseRelation, DataSourceRegister, RelationProvider, SchemaRelationProvider}
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{SQLContext, SparkSession}
Expand Down Expand Up @@ -67,22 +68,26 @@ class DefaultSource
* This method will probably be removed once the correct hierarchy for [[FixedLenReader]] is put in place.
*/
private def buildEitherReader(spark: SparkSession, cobolParameters: CobolParameters): Reader = {
if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) {
val reader = if (cobolParameters.isText && cobolParameters.variableLengthParams.isEmpty) {
createTextReader(cobolParameters, spark)
} else if (cobolParameters.variableLengthParams.isEmpty) {
createFixedLengthReader(cobolParameters, spark)
}
else {
createVariableLengthReader(cobolParameters, spark)
}

if (cobolParameters.debugLayoutPositions)
logger.info(s"Layout positions:\n${reader.getCobolSchema.copybook.generateRecordLayoutPositions()}")
reader
}

/**
* Creates a Reader that knows how to consume text Cobol records.
*/
private def createTextReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {
val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark)
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
new FixedLenTextReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
)
}
Expand All @@ -93,7 +98,7 @@ class DefaultSource
private def createFixedLengthReader(parameters: CobolParameters, spark: SparkSession): FixedLenReader = {

val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark)
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
new FixedLenNestedReader(copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
)
}
Expand All @@ -107,7 +112,7 @@ class DefaultSource


val copybookContent = CopybookContentLoader.load(parameters, spark.sparkContext.hadoopConfiguration)
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark)
val defaultHdfsBlockSize = SparkUtils.getDefaultHdfsBlockSize(spark, parameters.sourcePaths.headOption)
new VarLenNestedReader(
copybookContent, getReaderProperties(parameters, defaultHdfsBlockSize)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package za.co.absa.cobrix.spark.cobol.utils

import com.fasterxml.jackson.databind.ObjectMapper
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.SparkContext
import org.apache.spark.sql.functions.{array, col, expr, max, struct}
import za.co.absa.cobrix.spark.cobol.utils.impl.HofsWrapper.transform
Expand Down Expand Up @@ -464,13 +464,18 @@ object SparkUtils extends Logging {
}.getOrElse(None)
}

def getDefaultHdfsBlockSize(spark: SparkSession): Option[Int] = {
def getDefaultHdfsBlockSize(spark: SparkSession, pathOpt: Option[String]): Option[Int] = {
val conf = spark.sparkContext.hadoopConfiguration
val fileSystem = FileSystem.get(conf)

val fileSystem =pathOpt match {
case Some(pathStr) => new Path(pathStr).getFileSystem(conf)
case None => FileSystem.get(conf)
}

val hdfsBlockSize = HDFSUtils.getHDFSDefaultBlockSizeMB(fileSystem)
hdfsBlockSize match {
case None => logger.info(s"Unable to get HDFS default block size.")
case Some(size) => logger.info(s"HDFS default block size = $size MB.")
case None => logger.info(s"Unable to get default block size for '${fileSystem.getScheme}://.")
case Some(size) => logger.info(s"Default block size for '${fileSystem.getScheme}://' is $size MB.")
}

hdfsBlockSize
Expand Down
Loading