Skip to content

Commit

Permalink
refactoring read/write code to be more generic
Browse files Browse the repository at this point in the history
  • Loading branch information
froks committed Aug 11, 2024
1 parent a6b0d58 commit 76faf2d
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 207 deletions.
166 changes: 81 additions & 85 deletions src/main/kotlin/dltcore/DltObjects.kt

Large diffs are not rendered by default.

131 changes: 9 additions & 122 deletions src/main/kotlin/dltcore/DltParser.kt
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
package dltcore

import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.*
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlin.io.path.fileSize
import kotlin.math.min
import library.BinaryInputStream
import library.ByteOrder

public data class DltReadStatus(
val index: Long,
Expand All @@ -22,102 +14,14 @@ public data class DltReadStatus(
val dltMessage: DltMessage?,
)

private const val OVERLAP = 10_000_000

private class LargeFileBufferChooser(val path: Path) : AutoCloseable {
private lateinit var currentBuffer: ByteBuffer

private val fileSize = path.fileSize()
private var fileChannel: FileChannel = FileChannel.open(path, StandardOpenOption.READ)
private var absolutePosition = -1L
private var bufferIndex = 0

val buffer: ByteBuffer
get() {
if (absolutePosition == -1L) {
absolutePosition = 0
currentBuffer = fileChannel.map(
FileChannel.MapMode.READ_ONLY,
absolutePosition,
min(fileSize, Integer.MAX_VALUE.toLong())
)
bufferIndex = 0
return currentBuffer
}
val relativePosition = currentBuffer.position()
if (relativePosition >= (Integer.MAX_VALUE - OVERLAP)) {
absolutePosition += relativePosition.toLong()
currentBuffer = fileChannel.map(
FileChannel.MapMode.READ_ONLY,
absolutePosition,
min(fileSize - absolutePosition, Integer.MAX_VALUE.toLong())
)
}
return currentBuffer
}

override fun close() {
fileChannel.close()
}
}

private class DltMessageIteratorPath(private val largeFile: LargeFileBufferChooser) : Iterator<DltReadStatus> {
private var index: Long = 0
private var successCount: Long = 0
private var errorCount: Long = 0
private val totalSize = largeFile.path.fileSize()

private fun parseDltMessage(buffer: ByteBuffer, version: DltStorageVersion): DltMessage =
when (version) {
DltStorageVersion.V1 -> DltMessageV1.fromByteBuffer(buffer)
DltStorageVersion.V2 -> throw UnsupportedOperationException("not supported yet")
}

override fun hasNext(): Boolean {
val buffer = largeFile.buffer
return buffer.hasRemaining()
}

override fun next(): DltReadStatus {
val buffer = largeFile.buffer
buffer.order(ByteOrder.BIG_ENDIAN)
if (buffer.hasRemaining()) {
val message = try {
val magic = buffer.int
val version = DltStorageVersion.getByMagic(magic)
parseDltMessage(buffer, version)
} catch (e: RuntimeException) {
errorCount++
throw RuntimeException(
"Error while parsing message at file position ${buffer.position()}: ${e.message}",
e
)
}
successCount++
val progress = buffer.position().toFloat() / totalSize.toFloat()
return DltReadStatus(
index = index++,
filePosition = buffer.position().toLong(),
fileSize = totalSize,
progress = progress,
progressText = "Parsing file",
errorCount = errorCount,
successCount = successCount,
dltMessage = message
)
}
throw RuntimeException("No more data available, but next() was called on iterator")
}
}

private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) : Iterator<DltReadStatus> {
private class DltMessageIterator(val buffer: BinaryInputStream, val totalSize: Long?) : Iterator<DltReadStatus> {
private var index: Long = 0
private var successCount: Long = 0
private var errorCount: Long = 0

private fun parseDltMessage(buffer: ByteBuffer, version: DltStorageVersion): DltMessage =
private fun parseDltMessage(buffer: BinaryInputStream, version: DltStorageVersion): DltMessage =
when (version) {
DltStorageVersion.V1 -> DltMessageV1.fromByteBuffer(buffer)
DltStorageVersion.V1 -> DltMessageV1.read(buffer)
DltStorageVersion.V2 -> throw UnsupportedOperationException("not supported yet")
}

Expand All @@ -128,7 +32,7 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) :
buffer.order(ByteOrder.BIG_ENDIAN)
if (buffer.hasRemaining()) {
val message = try {
val magic = buffer.int
val magic = buffer.readInt()
val version = DltStorageVersion.getByMagic(magic)
parseDltMessage(buffer, version)
} catch (e: RuntimeException) {
Expand All @@ -146,7 +50,7 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) :
}
return DltReadStatus(
index = index++,
filePosition = buffer.position().toLong(),
filePosition = buffer.position(),
fileSize = totalSize,
progress = progress,
progressText = "Parsing file",
Expand All @@ -162,24 +66,7 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) :
public class DltMessageParser private constructor() {

public companion object {
public fun parseBuffer(buffer: ByteBuffer, totalSize: Long?): Stream<DltReadStatus> =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
DltMessageIterator(buffer, totalSize),
Spliterator.NONNULL or Spliterator.ORDERED
), false
)

public fun parseFile(path: Path): Stream<DltReadStatus> {
val fb = LargeFileBufferChooser(path)
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
DltMessageIteratorPath(fb),
Spliterator.NONNULL or Spliterator.ORDERED
), false
).onClose {
fb.close()
}
}
public fun parseBuffer(buffer: BinaryInputStream, totalSize: Long?): Sequence<DltReadStatus> =
DltMessageIterator(buffer, totalSize).asSequence()
}
}
22 changes: 22 additions & 0 deletions src/main/kotlin/library/BinaryInputStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package library

import library.jvm.ByteBufferBinaryInputStream
import java.nio.ByteBuffer

public interface BinaryInputStream {
public fun order(order: ByteOrder)
public fun hasRemaining(): Boolean
public fun position(): Long

public fun readByte(): Byte
public fun readShort(): Short
public fun readInt(): Int
public fun readLong(): Long

public fun readArray(len: Int): ByteArray

public companion object {
public fun wrap(array: ByteArray): BinaryInputStream =
ByteBufferBinaryInputStream(ByteBuffer.wrap(array))
}
}
11 changes: 11 additions & 0 deletions src/main/kotlin/library/BinaryOutputStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package library

public interface BinaryOutputStream {
public fun order(order: ByteOrder)
public fun put(data: ByteArray)

public fun writeByte(value: Byte)
public fun writeShort(value: Short)
public fun writeInt(value: Int)
public fun writeLong(value: Long)
}
6 changes: 6 additions & 0 deletions src/main/kotlin/library/ByteOrder.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package library

public enum class ByteOrder {
BIG_ENDIAN,
LITTLE_ENDIAN
}
35 changes: 35 additions & 0 deletions src/main/kotlin/library/jvm/ByteBufferBinaryInputStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package library.jvm

import library.BinaryInputStream
import library.ByteOrder
import java.nio.ByteBuffer

public class ByteBufferBinaryInputStream(private val buffer: ByteBuffer) : BinaryInputStream {
override fun order(order: ByteOrder) {
buffer.order(order.asByteBufferByteOrder())
}

override fun hasRemaining(): Boolean =
buffer.hasRemaining()

override fun position(): Long =
buffer.position().toLong()

override fun readByte(): Byte =
buffer.get()

override fun readShort(): Short =
buffer.getShort()

override fun readInt(): Int =
buffer.getInt()

override fun readLong(): Long =
buffer.getLong()

override fun readArray(len: Int): ByteArray {
val data = ByteArray(len)
buffer.get(data)
return data
}
}
31 changes: 31 additions & 0 deletions src/main/kotlin/library/jvm/ByteBufferBinaryOutputStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package library.jvm

import library.BinaryOutputStream
import library.ByteOrder
import java.nio.ByteBuffer

public class ByteBufferBinaryOutputStream(private val buffer: ByteBuffer) : BinaryOutputStream {
override fun order(order: ByteOrder) {
buffer.order(order.asByteBufferByteOrder())
}

override fun put(data: ByteArray) {
buffer.put(data)
}

override fun writeByte(value: Byte) {
buffer.put(value)
}

override fun writeShort(value: Short) {
buffer.putShort(value)
}

override fun writeInt(value: Int) {
buffer.putInt(value)
}

override fun writeLong(value: Long) {
buffer.putLong(value)
}
}
8 changes: 8 additions & 0 deletions src/main/kotlin/library/jvm/ByteBufferStreamExtensions.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package library.jvm

import library.ByteOrder

public fun ByteOrder.asByteBufferByteOrder(): java.nio.ByteOrder = when (this) {
ByteOrder.LITTLE_ENDIAN -> java.nio.ByteOrder.LITTLE_ENDIAN
ByteOrder.BIG_ENDIAN -> java.nio.ByteOrder.BIG_ENDIAN
}
83 changes: 83 additions & 0 deletions src/main/kotlin/library/jvm/LargeFileBinaryInputStream.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package library.jvm

import dltcore.DltMessageParser
import dltcore.DltReadStatus
import library.BinaryInputStream
import library.ByteOrder
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import kotlin.io.path.fileSize
import kotlin.math.min

public fun DltMessageParser.Companion.parseFile(path: Path): Sequence<DltReadStatus> {
val bis = LargeFileByteBufferInputStream(path)
return parseBuffer(bis, path.fileSize())
}

private const val OVERLAP = 10_000_000

private class LargeFileByteBufferInputStream(path: Path) : BinaryInputStream {
private lateinit var currentInputStream: BinaryInputStream

private val fileSize = path.fileSize()
private var fileChannel: FileChannel = FileChannel.open(path, StandardOpenOption.READ)
private var absolutePosition = -1L
private var bufferIndex = 0

private val buffer: BinaryInputStream
get() {
if (absolutePosition == -1L) {
absolutePosition = 0
val buffer = fileChannel.map(
FileChannel.MapMode.READ_ONLY,
absolutePosition,
min(fileSize, Integer.MAX_VALUE.toLong())
)
currentInputStream = ByteBufferBinaryInputStream(buffer)
bufferIndex = 0
return currentInputStream
}
val relativePosition = currentInputStream.position()
if (relativePosition >= (Integer.MAX_VALUE - OVERLAP)) {
absolutePosition += relativePosition
val buffer = fileChannel.map(
FileChannel.MapMode.READ_ONLY,
absolutePosition,
min(fileSize - absolutePosition, Integer.MAX_VALUE.toLong())
)
currentInputStream = ByteBufferBinaryInputStream(buffer)
}
return currentInputStream
}

override fun order(order: ByteOrder) {
buffer.order(order)
}

override fun hasRemaining(): Boolean {
val remaining = buffer.hasRemaining()
if (!remaining) {
fileChannel.close()
}
return remaining
}

override fun position(): Long =
absolutePosition + currentInputStream.position()

override fun readByte(): Byte =
buffer.readByte()

override fun readShort(): Short =
buffer.readShort()

override fun readInt(): Int =
buffer.readInt()

override fun readLong(): Long =
buffer.readLong()

override fun readArray(len: Int): ByteArray =
buffer.readArray(len)
}

0 comments on commit 76faf2d

Please sign in to comment.