From 30c101d96bd2617ca6d4fd3a4e5857d9a5026cb9 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Wed, 13 May 2026 09:56:54 +0200 Subject: [PATCH 1/3] #845 Retain start and end file offsets when converting mainframe files in-place using 'CobolProcessor'. --- .../impl/CobolProcessorInPlace.scala | 7 ++- .../cobrix/cobol/reader/stream/FSStream.scala | 46 ++++++++++++++++--- .../cobol/reader/stream/SimpleStream.scala | 4 ++ .../cobrix/cobol/mock/ByteStreamMock.scala | 4 ++ .../CobolProcessorBuilderSuite.scala | 2 +- .../reader/memorystream/TestByteStream.scala | 5 ++ .../memorystream/TestStringStream.scala | 4 ++ .../cobol/source/streaming/FileStreamer.scala | 7 ++- 8 files changed, 69 insertions(+), 10 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala index 59867282..52eed1d2 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala @@ -21,7 +21,7 @@ import za.co.absa.cobrix.cobol.processor.{CobolProcessor, RawRecordProcessor} import za.co.absa.cobrix.cobol.reader.VarLenNestedReader import za.co.absa.cobrix.cobol.reader.extractors.raw.RawRecordExtractor import za.co.absa.cobrix.cobol.reader.parameters.ReaderParameters -import za.co.absa.cobrix.cobol.reader.stream.SimpleStream +import za.co.absa.cobrix.cobol.reader.stream.{FSStream, SimpleStream} import java.io.OutputStream @@ -47,12 +47,15 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters, val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None) try { - StreamProcessor.processStreamInPlace(copybook, + outputStream.write(inputStream.getSkippedStartBytes) + val recordsProcessed = StreamProcessor.processStreamInPlace(copybook, options, inputStream, recordExtractor, rawRecordProcessor, outputStream) + outputStream.write(inputStream.getSkippedEndBytes) + recordsProcessed } finally { inputStream.close() } diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala index 542fffde..564e1718 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala @@ -25,11 +25,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon private val fileSize: Long = new File(fileName).length() private val effectiveSize: Long = math.max(0L, fileSize - fileStartOffset - fileEndOffset) private var byteIndex = 0L - - // Skip the start offset if specified - if (fileStartOffset > 0) { - skipFully(fileStartOffset) - } + private var skipped: Boolean = false override def size: Long = effectiveSize @@ -39,13 +35,50 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon override def inputFileName: String = fileName + override def getSkippedStartBytes: Array[Byte] = { + if (skipped || fileStartOffset <= 0) + Array.empty[Byte] + else { + skipped = true + val b = new Array[Byte](fileStartOffset.toInt) + val actual = bytesStream.read(b, 0, fileStartOffset.toInt) + if (actual <= 0) { + Array.empty[Byte] + } else { + b.take(actual) + } + } + } + + override def getSkippedEndBytes: Array[Byte] = { + if (byteIndex >= effectiveSize && !isClosed) { + val b = new Array[Byte](fileEndOffset.toInt) + val actual = bytesStream.read(b, 0, fileEndOffset.toInt) + if (actual <= 0) { + Array.empty[Byte] + } else { + b.take(actual) + } + } else { + close() + Array.empty[Byte] + } + } + + @throws(classOf[IOException]) override def next(numberOfBytes: Int): Array[Byte] = { if (numberOfBytes <= 0) throw new IllegalArgumentException("Value of numberOfBytes should be greater than zero.") + if (!skipped && fileStartOffset > 0) { + // Skip the start offset if specified + skipFully(fileStartOffset) + } + // Check if we've reached the effective end of the stream if (byteIndex >= effectiveSize) { - close() + if (fileEndOffset <= 0) + close() return new Array[Byte](0) } @@ -77,6 +110,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon } private def skipFully(bytesToSkip: Long): Unit = { + skipped = true var remaining = math.min(bytesToSkip, fileSize) while (remaining > 0) { val skipped = bytesStream.skip(remaining) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala index ce1ab6c8..1d923115 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala @@ -35,4 +35,8 @@ trait SimpleStream extends AutoCloseable { def copyStream(): SimpleStream @throws(classOf[Exception]) def next(numberOfBytes: Int): Array[Byte] + + @throws(classOf[Exception]) def getSkippedStartBytes: Array[Byte] + + @throws(classOf[Exception]) def getSkippedEndBytes: Array[Byte] } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala index e7533dcc..eef4eaf2 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala @@ -47,6 +47,10 @@ class ByteStreamMock(bytes: Array[Byte]) extends SimpleStream{ } } + override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] + + override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] + override def close(): Unit = position = sz override def copyStream(): SimpleStream = { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala index 63cef3dd..c4233cf7 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/processor/CobolProcessorBuilderSuite.scala @@ -137,7 +137,7 @@ class CobolProcessorBuilderSuite extends AnyWordSpec with BinaryFileFixture { assert(count == 4) assert(outputData.sameElements( - Array(-16,-15,-14,-13).map(_.toByte) + Array(7, 7, 7, -16,-15,-14,-13, 8, 8).map(_.toByte) )) } } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala index 5353756c..fee143ad 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala @@ -47,6 +47,11 @@ class TestByteStream(bytes: Array[Byte]) extends SimpleStream{ } } + override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] + + override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] + + override def close(): Unit = position = sz override def copyStream(): SimpleStream = { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala index eaedebcb..b516b29f 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala @@ -47,6 +47,10 @@ class TestStringStream(str: String) extends SimpleStream{ } } + override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] + + override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] + override def close(): Unit = position = sz override def copyStream(): SimpleStream = { diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala index 6f6e39a0..cefdb5dd 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala @@ -38,7 +38,6 @@ import java.io.IOException * @note This class is not thread-safe and should only be accessed from a single thread */ class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: Long = 0L, maximumBytes: Long = 0L) extends SimpleStream { - private val logger: Logger = LoggerFactory.getLogger(this.getClass) private val hadoopPath = new Path(filePath) @@ -125,6 +124,12 @@ class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: L } } + @throws[IOException] + override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] + + @throws[IOException] + override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] + @throws[IOException] override def close(): Unit = { wasOpened = true From 0b6596739455280cc0a92db2a498764517375c9b Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 14 May 2026 09:03:02 +0200 Subject: [PATCH 2/3] #845 Retain start and end file offsets when converting mainframe files in-place using 'SparkCobolProcessor'. --- .../impl/CobolProcessorInPlace.scala | 14 +++++++++++-- .../cobrix/cobol/reader/stream/FSStream.scala | 4 ++-- .../cobol/reader/stream/SimpleStream.scala | 4 ---- .../cobrix/cobol/mock/ByteStreamMock.scala | 4 ---- .../reader/memorystream/TestByteStream.scala | 5 ----- .../memorystream/TestStringStream.scala | 4 ---- .../spark/cobol/SparkCobolProcessor.scala | 20 ++++++++++++++++--- .../cobol/source/streaming/FileStreamer.scala | 6 ------ .../cobol/SparkCobolProcessorSuite.scala | 4 +++- 9 files changed, 34 insertions(+), 31 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala index 52eed1d2..24ae4955 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/processor/impl/CobolProcessorInPlace.scala @@ -47,14 +47,24 @@ class CobolProcessorInPlace(readerParameters: ReaderParameters, val recordExtractor = CobolProcessorBase.getRecordExtractor(readerParameters, copybookContents, inputStream, None) try { - outputStream.write(inputStream.getSkippedStartBytes) + inputStream match { + case stream: FSStream => + outputStream.write(stream.getSkippedStartBytes) + case _ => + } + val recordsProcessed = StreamProcessor.processStreamInPlace(copybook, options, inputStream, recordExtractor, rawRecordProcessor, outputStream) - outputStream.write(inputStream.getSkippedEndBytes) + + inputStream match { + case stream: FSStream => + outputStream.write(stream.getSkippedEndBytes) + case _ => + } recordsProcessed } finally { inputStream.close() diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala index 564e1718..fc799fb8 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala @@ -35,7 +35,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon override def inputFileName: String = fileName - override def getSkippedStartBytes: Array[Byte] = { + def getSkippedStartBytes: Array[Byte] = { if (skipped || fileStartOffset <= 0) Array.empty[Byte] else { @@ -50,7 +50,7 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon } } - override def getSkippedEndBytes: Array[Byte] = { + def getSkippedEndBytes: Array[Byte] = { if (byteIndex >= effectiveSize && !isClosed) { val b = new Array[Byte](fileEndOffset.toInt) val actual = bytesStream.read(b, 0, fileEndOffset.toInt) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala index 1d923115..ce1ab6c8 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/SimpleStream.scala @@ -35,8 +35,4 @@ trait SimpleStream extends AutoCloseable { def copyStream(): SimpleStream @throws(classOf[Exception]) def next(numberOfBytes: Int): Array[Byte] - - @throws(classOf[Exception]) def getSkippedStartBytes: Array[Byte] - - @throws(classOf[Exception]) def getSkippedEndBytes: Array[Byte] } diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala index eef4eaf2..e7533dcc 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/mock/ByteStreamMock.scala @@ -47,10 +47,6 @@ class ByteStreamMock(bytes: Array[Byte]) extends SimpleStream{ } } - override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] - - override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] - override def close(): Unit = position = sz override def copyStream(): SimpleStream = { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala index fee143ad..5353756c 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestByteStream.scala @@ -47,11 +47,6 @@ class TestByteStream(bytes: Array[Byte]) extends SimpleStream{ } } - override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] - - override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] - - override def close(): Unit = position = sz override def copyStream(): SimpleStream = { diff --git a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala index b516b29f..eaedebcb 100644 --- a/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala +++ b/cobol-parser/src/test/scala/za/co/absa/cobrix/cobol/reader/memorystream/TestStringStream.scala @@ -47,10 +47,6 @@ class TestStringStream(str: String) extends SimpleStream{ } } - override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] - - override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] - override def close(): Unit = position = sz override def copyStream(): SimpleStream = { diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala index 96e327e2..9f2aed98 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessor.scala @@ -154,12 +154,13 @@ object SparkCobolProcessor { val readerParameters = cobolProcessorBuilder.getReaderParameters val cobolProcessor = cobolProcessorBuilder.build() + val retainStartAndEndOffsets = cobolProcessingStrategy == CobolProcessingStrategy.InPlace val processor = new SparkCobolProcessor { private val sconf = new SerializableConfiguration(spark.sparkContext.hadoopConfiguration) override def process(listOfFiles: Seq[String], outputPath: String): Long = { - getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, numberOfThreads) + getFileProcessorRdd(listOfFiles, outputPath, cobolProcessor, readerParameters, rawRecordProcessorOpt.get, sconf, retainStartAndEndOffsets, numberOfThreads) .reduce(_ + _) } } @@ -190,12 +191,13 @@ object SparkCobolProcessor { readerParameters: ReaderParameters, rawRecordProcessor: SerializableRawRecordProcessor, sconf: SerializableConfiguration, + retainStartAndEndOffsets: Boolean, numberOfThreads: Int )(implicit spark: SparkSession): RDD[Long] = { val groupedFiles = listOfFiles.grouped(numberOfThreads).toSeq val rdd = spark.sparkContext.parallelize(groupedFiles) rdd.map(group => { - processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, numberOfThreads) + processListOfFiles(group, outputPath, cobolProcessor, readerParameters, rawRecordProcessor, sconf, retainStartAndEndOffsets, numberOfThreads) }) } @@ -254,6 +256,7 @@ object SparkCobolProcessor { readerParameters: ReaderParameters, rawRecordProcessor: SerializableRawRecordProcessor, sconf: SerializableConfiguration, + retainStartAndEndOffsets: Boolean, numberOfThreads: Int ): Long = { val threadPool: ExecutorService = Executors.newFixedThreadPool(numberOfThreads) @@ -281,7 +284,18 @@ object SparkCobolProcessor { val recordCount = UsingUtils.using(new FileStreamer(inputFile, sconf.value, fileStartOffset, maximumBytes)) { ifs => UsingUtils.using(new BufferedOutputStream(outputFs.create(outputFile, true))) { ofs => - cobolProcessor.process(ifs, ofs)(rawRecordProcessor) + if (fileStartOffset > 0 && retainStartAndEndOffsets) { + val tempStream = new FileStreamer(inputFile, sconf.value) + ofs.write(tempStream.next(fileStartOffset)) + tempStream.close() + } + val recordsProcessed = cobolProcessor.process(ifs, ofs)(rawRecordProcessor) + if (fileEndOffset > 0 && retainStartAndEndOffsets) { + val tempStream = new FileStreamer(inputFile, sconf.value, maximumBytes + fileStartOffset) + ofs.write(tempStream.next(fileEndOffset)) + tempStream.close() + } + recordsProcessed } } diff --git a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala index cefdb5dd..7caa2593 100644 --- a/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala +++ b/spark-cobol/src/main/scala/za/co/absa/cobrix/spark/cobol/source/streaming/FileStreamer.scala @@ -124,12 +124,6 @@ class FileStreamer(filePath: String, hadoopConfig: Configuration, startOffset: L } } - @throws[IOException] - override def getSkippedStartBytes: Array[Byte] = Array.empty[Byte] - - @throws[IOException] - override def getSkippedEndBytes: Array[Byte] = Array.empty[Byte] - @throws[IOException] override def close(): Unit = { wasOpened = true diff --git a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala index 44d2d2ac..9d48345e 100644 --- a/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala +++ b/spark-cobol/src/test/scala/za/co/absa/cobrix/spark/cobol/SparkCobolProcessorSuite.scala @@ -176,13 +176,15 @@ class SparkCobolProcessorSuite extends AnyWordSpec with SparkTestBase with Binar val outputData = readBinaryFile(outputFile) assert(outputData.sameElements( - Array(-16, -15, -14, -13).map(_.toByte) + Array(7, 7, 7, -16, -15, -14, -13, 8, 8).map(_.toByte) )) val actual = spark.read .format("cobol") .option("copybook_contents", copybook) .option("record_format", "F") + .option("file_start_offset", "3") + .option("file_end_offset", "2") .option("pedantic", "true") .load(outputFile) .toJSON From 78b66608b81053f1e081c9bd74c8f2dc6fcc564e Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Thu, 14 May 2026 09:29:59 +0200 Subject: [PATCH 3/3] #845 Fix PR suggestions. --- .../cobrix/cobol/reader/stream/FSStream.scala | 51 +++++++++++++++---- 1 file changed, 42 insertions(+), 9 deletions(-) diff --git a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala index fc799fb8..b936fa48 100644 --- a/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala +++ b/cobol-parser/src/main/scala/za/co/absa/cobrix/cobol/reader/stream/FSStream.scala @@ -35,13 +35,27 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon override def inputFileName: String = fileName + /** + * Reads and returns the bytes at the beginning of the file that precede the effective stream content. + * + * This method attempts to read the leading bytes (defined by the file start offset) that appear + * before the effective content of the stream. It only performs the read on the first invocation + * and if the file start offset is greater than zero. Subsequent calls will return an empty array + * since the skipped flag is set after the first successful read. + * + * @return an array of bytes containing the skipped start bytes, or an empty array if the bytes + * have already been read, the file start offset is zero or negative, or no bytes could be read. + */ def getSkippedStartBytes: Array[Byte] = { - if (skipped || fileStartOffset <= 0) + if (fileStartOffset > Int.MaxValue) + throw new IllegalArgumentException(s"fileStartOffset ($fileStartOffset) exceeds maximum supported value (${Int.MaxValue})") + val fileStartOffsetInt = fileStartOffset.toInt + if (skipped || fileStartOffsetInt <= 0) Array.empty[Byte] else { skipped = true - val b = new Array[Byte](fileStartOffset.toInt) - val actual = bytesStream.read(b, 0, fileStartOffset.toInt) + val b = new Array[Byte](fileStartOffsetInt) + val actual = bytesStream.read(b, 0, fileStartOffsetInt) if (actual <= 0) { Array.empty[Byte] } else { @@ -50,17 +64,36 @@ class FSStream (fileName: String, fileStartOffset: Long = 0L, fileEndOffset: Lon } } + /** + * Reads and returns the bytes at the end of the file that follow the effective stream content. + * + * This method attempts to read the trailing bytes (defined by the file end offset) that appear + * after the effective content of the stream. It only performs the read when the byte index has + * reached or exceeded the effective size and the stream has not yet been closed. If the file end + * offset is zero or negative, the stream is closed and an empty array is returned. + * + * @return an array of bytes containing the skipped end bytes, or an empty array if the stream + * has not yet reached the end of the effective content, the stream is already closed, + * the file end offset is zero or negative, or no bytes could be read. + */ def getSkippedEndBytes: Array[Byte] = { + if (fileEndOffset > Int.MaxValue) + throw new IllegalArgumentException(s"fileEndOffset ($fileEndOffset) exceeds maximum supported value (${Int.MaxValue})") + val fileEndOffsetInt = fileEndOffset.toInt if (byteIndex >= effectiveSize && !isClosed) { - val b = new Array[Byte](fileEndOffset.toInt) - val actual = bytesStream.read(b, 0, fileEndOffset.toInt) - if (actual <= 0) { - Array.empty[Byte] + if (fileEndOffsetInt > 0) { + val b = new Array[Byte](fileEndOffsetInt) + val actual = bytesStream.read(b, 0, fileEndOffsetInt) + if (actual <= 0) { + Array.empty[Byte] + } else { + b.take(actual) + } } else { - b.take(actual) + close() + Array.empty[Byte] } } else { - close() Array.empty[Byte] } }