Skip to content

Commit

Permalink
fix dlt parser to allow files > 2GB
Browse files Browse the repository at this point in the history
  • Loading branch information
froks committed Jun 15, 2024
1 parent 3fc6146 commit 32188d7
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ enum class TableColumns(val title: String, val columnClass: KClass<*>, val prefe

class DltTableModel(private var dltTarget: DltTarget, private var internalFilterList: List<DltMessageFilter>?) : AbstractTableModel() {
private val log = LoggerFactory.getLogger(DltTableModel::class.java)
private val CACHED_ENTRIES_COUNT = 1_000
private val READ_ALL = false

var filterList: List<DltMessageFilter>?
get() = internalFilterList
Expand All @@ -34,7 +36,6 @@ class DltTableModel(private var dltTarget: DltTarget, private var internalFilter
fireTableDataChanged()
}

private val CACHED_ENTRIES_COUNT = 1_000

private var tableAccess: DltTableDataAccess = dltTarget.dataAccess
private var rowCount: Int = 0
Expand Down Expand Up @@ -73,13 +74,21 @@ class DltTableModel(private var dltTarget: DltTarget, private var internalFilter
private fun getRow(rowIndex: Int): DltMessageDto {
val isAvailable = rowIndex in listOffset until listOffset + cachedEntries.size
if (!isAvailable) {
val offset = (rowIndex - (CACHED_ENTRIES_COUNT / 2)).coerceAtLeast(0)

val duration = measureTimeMillis {
cachedEntries = tableAccess.readData(filterList.sqlWhere(), offset, CACHED_ENTRIES_COUNT)
if (READ_ALL) {
val duration = measureTimeMillis {
cachedEntries = tableAccess.readData(filterList.sqlWhere(), null, null)
}
log.info("Reading ${cachedEntries.size} entries took $duration ms")
listOffset = 0
} else {
val offset = (rowIndex - (CACHED_ENTRIES_COUNT / 2)).coerceAtLeast(0)

val duration = measureTimeMillis {
cachedEntries = tableAccess.readData(filterList.sqlWhere(), offset, CACHED_ENTRIES_COUNT)
}
log.info("Reading ${cachedEntries.size} entries took $duration ms")
listOffset = offset
}
log.info("Reading ${cachedEntries.size} entries took $duration ms")
listOffset = offset
}

return cachedEntries[rowIndex - listOffset]
Expand Down
129 changes: 112 additions & 17 deletions dlt-core/src/main/kotlin/dltcore/DltParser.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.channels.FileChannel
import java.nio.file.Path
import java.nio.file.StandardOpenOption
import java.util.*
import java.util.stream.Stream
import java.util.stream.StreamSupport
import kotlin.io.path.fileSize
import kotlin.math.min

data class DltReadStatus(
val index: Long,
Expand All @@ -19,6 +22,94 @@ data class DltReadStatus(
val dltMessage: DltMessage?,
)

private const val OVERLAP = 10_000_000

private class LargeFileBufferChooser(val path: Path) : AutoCloseable {
private lateinit var currentBuffer: ByteBuffer

private val fileSize = path.fileSize();
private var fileChannel: FileChannel = FileChannel.open(path, StandardOpenOption.READ)
private var absolutePosition = -1L
private var bufferIndex = 0

val buffer: ByteBuffer
get() {
if (absolutePosition == -1L) {
absolutePosition = 0
currentBuffer = fileChannel.map(
FileChannel.MapMode.READ_ONLY,
absolutePosition,
min(fileSize, Integer.MAX_VALUE.toLong())
)
bufferIndex = 0
return currentBuffer
}
val relativePosition = currentBuffer.position()
if (relativePosition >= (Integer.MAX_VALUE - OVERLAP)) {
absolutePosition += relativePosition.toLong()
currentBuffer = fileChannel.map(
FileChannel.MapMode.READ_ONLY,
absolutePosition,
min(fileSize - absolutePosition, Integer.MAX_VALUE.toLong())
)
}
return currentBuffer
}

override fun close() {
fileChannel.close()
}
}

private class DltMessageIteratorPath(private val largeFile: LargeFileBufferChooser) : Iterator<DltReadStatus> {
private var index: Long = 0
private var successCount: Long = 0
private var errorCount: Long = 0
private val totalSize = largeFile.path.fileSize()

private fun parseDltMessage(buffer: ByteBuffer, version: DltStorageVersion): DltMessage =
when (version) {
DltStorageVersion.V1 -> DltMessageV1.fromByteBuffer(buffer)
DltStorageVersion.V2 -> throw UnsupportedOperationException("not supported yet")
}

override fun hasNext(): Boolean {
val buffer = largeFile.buffer
return buffer.hasRemaining()
}

override fun next(): DltReadStatus {
val buffer = largeFile.buffer
buffer.order(ByteOrder.BIG_ENDIAN)
if (buffer.hasRemaining()) {
val message = try {
val magic = buffer.int
val version = DltStorageVersion.getByMagic(magic)
parseDltMessage(buffer, version)
} catch (e: RuntimeException) {
errorCount++
throw RuntimeException(
"Error while parsing message at file position ${buffer.position()}: ${e.message}",
e
)
}
successCount++
val progress = buffer.position().toFloat() / totalSize.toFloat()
return DltReadStatus(
index = index++,
filePosition = buffer.position().toLong(),
fileSize = totalSize,
progress = progress,
progressText = "Parsing file",
errorCount = errorCount,
successCount = successCount,
dltMessage = message
)
}
throw RuntimeException("No more data available, but next() was called on iterator")
}
}

private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) : Iterator<DltReadStatus> {
private var index: Long = 0
private var successCount: Long = 0
Expand All @@ -42,7 +133,10 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) :
parseDltMessage(buffer, version)
} catch (e: RuntimeException) {
errorCount++
throw RuntimeException("Error while parsing message at file position ${buffer.position()}: ${e.message}", e)
throw RuntimeException(
"Error while parsing message at file position ${buffer.position()}: ${e.message}",
e
)
}
successCount++
val progress = if (totalSize != null) {
Expand All @@ -68,23 +162,24 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) :
class DltMessageParser private constructor() {

companion object {
fun parseWithCallback(buffer: ByteBuffer, totalSize: Long?): Stream<DltReadStatus> =
StreamSupport.stream(Spliterators.spliteratorUnknownSize(
DltMessageIterator(buffer, totalSize),
Spliterator.NONNULL or Spliterator.ORDERED
), false)

fun parseFileWithCallback(path: Path): Stream<DltReadStatus> {
FileChannel.open(path).use { fileChannel ->
val buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size())
return parseWithCallback(buffer, fileChannel.size())
fun parseBuffer(buffer: ByteBuffer, totalSize: Long?): Stream<DltReadStatus> =
StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
DltMessageIterator(buffer, totalSize),
Spliterator.NONNULL or Spliterator.ORDERED
), false
)

fun parseFile(path: Path): Stream<DltReadStatus> {
val fb = LargeFileBufferChooser(path)
return StreamSupport.stream(
Spliterators.spliteratorUnknownSize(
DltMessageIteratorPath(fb),
Spliterator.NONNULL or Spliterator.ORDERED
), false
).onClose {
fb.close()
}
// System.gc() // JDK-4715154
}

fun parseFileAsObjects(path: Path): List<DltMessage> =
parseFileWithCallback(path).filter { it.dltMessage != null }.map { it.dltMessage!! }.toList()

}
}

4 changes: 2 additions & 2 deletions dlt-database/src/main/kotlin/db/DltManager.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ object DltManager {

private fun isExistingFileComplete(dbFile: File, dltFile: File, dataAccess: DltTableDataAccess): Boolean {
val expected = dataAccess.getEntryCount(emptyList())
val count: Long = DltMessageParser.parseFileWithCallback(dltFile.toPath()).count()
val count: Long = DltMessageParser.parseFile(dltFile.toPath()).count()
logger.info("Existing database '${dbFile.name}' has $count entries, expected $expected")
return expected == count

Expand Down Expand Up @@ -75,7 +75,7 @@ object DltManager {
val insertDuration = measureTimeMillis {
dataAccess.createInserter().use { inserter ->

DltMessageParser.parseFileWithCallback(dltFile.toPath()).forEach { progress ->
DltMessageParser.parseFile(dltFile.toPath()).forEach { progress ->
inserter.insertMsg(progress.dltMessage as DltMessageV1)
if (inserter.index % 20_000 == 0) {
inserter.executeBatch()
Expand Down
53 changes: 53 additions & 0 deletions dlt-database/src/main/kotlin/db/DltTableDataAccess.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package db

import dltcore.DltMessageV1
import dltcore.MessageTypeInfo
import org.ktorm.database.Database
import org.ktorm.dsl.*
import org.ktorm.schema.*
import org.slf4j.LoggerFactory
import java.sql.Connection
import java.sql.PreparedStatement
import java.time.Instant
import java.util.concurrent.atomic.AtomicInteger
import javax.sql.DataSource
import kotlin.system.measureTimeMillis
Expand Down Expand Up @@ -74,13 +76,64 @@ class DltTableDataAccess(private val dataSource: DataSource) {
): List<DltMessageDto> {
val list = mutableListOf<DltMessageDto>()

/*
database
.from(DltLog)
.select()
.whereWithOrConditions { list -> list.addAll(sqlClausesOr) }
.orderBy(DltLog.id.asc())
.limit(offset, limit)
.mapTo(list) { row -> DltLog.createEntity(row) }
*/

val sql = database
.from(DltLog)
.select(
DltLog.id,
DltLog.timestampSeconds, DltLog.timestampNanos,
DltLog.ecuId,
DltLog.appId,
DltLog.contextId,
DltLog.timestampHeader,
DltLog.sessionId,
DltLog.messageType,
DltLog.message,
)
.whereWithOrConditions { list -> list.addAll(sqlClausesOr) }
.orderBy(DltLog.id.asc())
.limit(offset, limit)

database.useConnection { connection ->
val expr = database.formatExpression(sql.expression)
connection.prepareStatement(expr.first).use { stmt ->
val params = expr.second
params.forEachIndexed { index, param ->
when (param.sqlType) {
VarcharSqlType -> stmt.setString(index + 1, param.value as String?)
IntSqlType -> stmt.setInt(index + 1, param.value as Int)
LongSqlType -> stmt.setLong(index + 1, param.value as Long)
}
}
stmt.executeQuery().use { rs ->
while (rs.next()) {
list.add(
DltMessageDto(
id = rs.getLong(1),
timestamp = Instant.ofEpochSecond(rs.getLong(2), rs.getLong(3)),
ecuId = rs.getString(4) ?: "",
appId = rs.getString(5) ?: "",
contextId = rs.getString(6) ?: "",
timestampHeader = rs.getInt(7).toUInt() ?: 0u,
sessionId = rs.getInt(8),
messageType = MessageTypeInfo.valueOf(rs.getString(9)!!),
message = rs.getString(10) ?: "",
)
)
}
}
}

}

return list
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class DltFilterApp : JFrame("dlt-filter") {
RandomAccessFile(destinationFile, "rw").use { randomAccessFile ->

val bb = ByteBuffer.allocate(100_000)
DltMessageParser.parseFileWithCallback(file.toPath()).forEach { status ->
DltMessageParser.parseFile(file.toPath()).forEach { status ->
if (status.filePosition != null && status.fileSize != null) {
percent = (status.filePosition!!.toFloat() / status.fileSize!!.toFloat()) * 100
} else if (percent == 0f) {
Expand Down
2 changes: 1 addition & 1 deletion dlt-filter-app/src/main/kotlin/dltfilterapp/MainCli.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ fun main(args: Array<String>) {
val appIds = args[1].split(',', ';').toSet()
val appIdsInts = appIds.map { it.asIntValue() }
var counter = 0
DltMessageParser.parseFileWithCallback(path).forEach { status ->
DltMessageParser.parseFile(path).forEach { status ->
counter++
val msg = status.dltMessage
if (msg is DltMessageV1) {
Expand Down

0 comments on commit 32188d7

Please sign in to comment.