diff --git a/dlt-analyzer-app/src/main/kotlin/analyzer/ui/table/DltTableModel.kt b/dlt-analyzer-app/src/main/kotlin/analyzer/ui/table/DltTableModel.kt index 513aacb..3049850 100644 --- a/dlt-analyzer-app/src/main/kotlin/analyzer/ui/table/DltTableModel.kt +++ b/dlt-analyzer-app/src/main/kotlin/analyzer/ui/table/DltTableModel.kt @@ -23,6 +23,8 @@ enum class TableColumns(val title: String, val columnClass: KClass<*>, val prefe class DltTableModel(private var dltTarget: DltTarget, private var internalFilterList: List?) : AbstractTableModel() { private val log = LoggerFactory.getLogger(DltTableModel::class.java) + private val CACHED_ENTRIES_COUNT = 1_000 + private val READ_ALL = false var filterList: List? get() = internalFilterList @@ -34,7 +36,6 @@ class DltTableModel(private var dltTarget: DltTarget, private var internalFilter fireTableDataChanged() } - private val CACHED_ENTRIES_COUNT = 1_000 private var tableAccess: DltTableDataAccess = dltTarget.dataAccess private var rowCount: Int = 0 @@ -73,13 +74,21 @@ class DltTableModel(private var dltTarget: DltTarget, private var internalFilter private fun getRow(rowIndex: Int): DltMessageDto { val isAvailable = rowIndex in listOffset until listOffset + cachedEntries.size if (!isAvailable) { - val offset = (rowIndex - (CACHED_ENTRIES_COUNT / 2)).coerceAtLeast(0) - - val duration = measureTimeMillis { - cachedEntries = tableAccess.readData(filterList.sqlWhere(), offset, CACHED_ENTRIES_COUNT) + if (READ_ALL) { + val duration = measureTimeMillis { + cachedEntries = tableAccess.readData(filterList.sqlWhere(), null, null) + } + log.info("Reading ${cachedEntries.size} entries took $duration ms") + listOffset = 0 + } else { + val offset = (rowIndex - (CACHED_ENTRIES_COUNT / 2)).coerceAtLeast(0) + + val duration = measureTimeMillis { + cachedEntries = tableAccess.readData(filterList.sqlWhere(), offset, CACHED_ENTRIES_COUNT) + } + log.info("Reading ${cachedEntries.size} entries took $duration ms") + listOffset = offset } - log.info("Reading ${cachedEntries.size} entries took $duration ms") - listOffset = offset } return cachedEntries[rowIndex - listOffset] diff --git a/dlt-core/src/main/kotlin/dltcore/DltParser.kt b/dlt-core/src/main/kotlin/dltcore/DltParser.kt index 07369ac..5820d28 100644 --- a/dlt-core/src/main/kotlin/dltcore/DltParser.kt +++ b/dlt-core/src/main/kotlin/dltcore/DltParser.kt @@ -4,9 +4,12 @@ import java.nio.ByteBuffer import java.nio.ByteOrder import java.nio.channels.FileChannel import java.nio.file.Path +import java.nio.file.StandardOpenOption import java.util.* import java.util.stream.Stream import java.util.stream.StreamSupport +import kotlin.io.path.fileSize +import kotlin.math.min data class DltReadStatus( val index: Long, @@ -19,6 +22,94 @@ data class DltReadStatus( val dltMessage: DltMessage?, ) +private const val OVERLAP = 10_000_000 + +private class LargeFileBufferChooser(val path: Path) : AutoCloseable { + private lateinit var currentBuffer: ByteBuffer + + private val fileSize = path.fileSize(); + private var fileChannel: FileChannel = FileChannel.open(path, StandardOpenOption.READ) + private var absolutePosition = -1L + private var bufferIndex = 0 + + val buffer: ByteBuffer + get() { + if (absolutePosition == -1L) { + absolutePosition = 0 + currentBuffer = fileChannel.map( + FileChannel.MapMode.READ_ONLY, + absolutePosition, + min(fileSize, Integer.MAX_VALUE.toLong()) + ) + bufferIndex = 0 + return currentBuffer + } + val relativePosition = currentBuffer.position() + if (relativePosition >= (Integer.MAX_VALUE - OVERLAP)) { + absolutePosition += relativePosition.toLong() + currentBuffer = fileChannel.map( + FileChannel.MapMode.READ_ONLY, + absolutePosition, + min(fileSize - absolutePosition, Integer.MAX_VALUE.toLong()) + ) + } + return currentBuffer + } + + override fun close() { + fileChannel.close() + } +} + +private class DltMessageIteratorPath(private val largeFile: LargeFileBufferChooser) : Iterator { + private var index: Long = 0 + private var successCount: Long = 0 + private var errorCount: Long = 0 + private val totalSize = largeFile.path.fileSize() + + private fun parseDltMessage(buffer: ByteBuffer, version: DltStorageVersion): DltMessage = + when (version) { + DltStorageVersion.V1 -> DltMessageV1.fromByteBuffer(buffer) + DltStorageVersion.V2 -> throw UnsupportedOperationException("not supported yet") + } + + override fun hasNext(): Boolean { + val buffer = largeFile.buffer + return buffer.hasRemaining() + } + + override fun next(): DltReadStatus { + val buffer = largeFile.buffer + buffer.order(ByteOrder.BIG_ENDIAN) + if (buffer.hasRemaining()) { + val message = try { + val magic = buffer.int + val version = DltStorageVersion.getByMagic(magic) + parseDltMessage(buffer, version) + } catch (e: RuntimeException) { + errorCount++ + throw RuntimeException( + "Error while parsing message at file position ${buffer.position()}: ${e.message}", + e + ) + } + successCount++ + val progress = buffer.position().toFloat() / totalSize.toFloat() + return DltReadStatus( + index = index++, + filePosition = buffer.position().toLong(), + fileSize = totalSize, + progress = progress, + progressText = "Parsing file", + errorCount = errorCount, + successCount = successCount, + dltMessage = message + ) + } + throw RuntimeException("No more data available, but next() was called on iterator") + } +} + private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) : Iterator { private var index: Long = 0 private var successCount: Long = 0 @@ -42,7 +133,10 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) : parseDltMessage(buffer, version) } catch (e: RuntimeException) { errorCount++ - throw RuntimeException("Error while parsing message at file position ${buffer.position()}: ${e.message}", e) + throw RuntimeException( + "Error while parsing message at file position ${buffer.position()}: ${e.message}", + e + ) } successCount++ val progress = if (totalSize != null) { @@ -68,23 +162,24 @@ private class DltMessageIterator(val buffer: ByteBuffer, val totalSize: Long?) : class DltMessageParser private constructor() { companion object { - fun parseWithCallback(buffer: ByteBuffer, totalSize: Long?): Stream = - StreamSupport.stream(Spliterators.spliteratorUnknownSize( - DltMessageIterator(buffer, totalSize), - Spliterator.NONNULL or Spliterator.ORDERED - ), false) - - fun parseFileWithCallback(path: Path): Stream { - FileChannel.open(path).use { fileChannel -> - val buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size()) - return parseWithCallback(buffer, fileChannel.size()) + fun parseBuffer(buffer: ByteBuffer, totalSize: Long?): Stream = + StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + DltMessageIterator(buffer, totalSize), + Spliterator.NONNULL or Spliterator.ORDERED + ), false + ) + + fun parseFile(path: Path): Stream { + val fb = LargeFileBufferChooser(path) + return StreamSupport.stream( + Spliterators.spliteratorUnknownSize( + DltMessageIteratorPath(fb), + Spliterator.NONNULL or Spliterator.ORDERED + ), false + ).onClose { + fb.close() } -// System.gc() // JDK-4715154 } - - fun parseFileAsObjects(path: Path): List = - parseFileWithCallback(path).filter { it.dltMessage != null }.map { it.dltMessage!! }.toList() - } } - diff --git a/dlt-database/src/main/kotlin/db/DltManager.kt b/dlt-database/src/main/kotlin/db/DltManager.kt index 23343b8..ccb812e 100644 --- a/dlt-database/src/main/kotlin/db/DltManager.kt +++ b/dlt-database/src/main/kotlin/db/DltManager.kt @@ -18,7 +18,7 @@ object DltManager { private fun isExistingFileComplete(dbFile: File, dltFile: File, dataAccess: DltTableDataAccess): Boolean { val expected = dataAccess.getEntryCount(emptyList()) - val count: Long = DltMessageParser.parseFileWithCallback(dltFile.toPath()).count() + val count: Long = DltMessageParser.parseFile(dltFile.toPath()).count() logger.info("Existing database '${dbFile.name}' has $count entries, expected $expected") return expected == count @@ -75,7 +75,7 @@ object DltManager { val insertDuration = measureTimeMillis { dataAccess.createInserter().use { inserter -> - DltMessageParser.parseFileWithCallback(dltFile.toPath()).forEach { progress -> + DltMessageParser.parseFile(dltFile.toPath()).forEach { progress -> inserter.insertMsg(progress.dltMessage as DltMessageV1) if (inserter.index % 20_000 == 0) { inserter.executeBatch() diff --git a/dlt-database/src/main/kotlin/db/DltTableDataAccess.kt b/dlt-database/src/main/kotlin/db/DltTableDataAccess.kt index b17d6c2..2a165ec 100644 --- a/dlt-database/src/main/kotlin/db/DltTableDataAccess.kt +++ b/dlt-database/src/main/kotlin/db/DltTableDataAccess.kt @@ -1,12 +1,14 @@ package db import dltcore.DltMessageV1 +import dltcore.MessageTypeInfo import org.ktorm.database.Database import org.ktorm.dsl.* import org.ktorm.schema.* import org.slf4j.LoggerFactory import java.sql.Connection import java.sql.PreparedStatement +import java.time.Instant import java.util.concurrent.atomic.AtomicInteger import javax.sql.DataSource import kotlin.system.measureTimeMillis @@ -74,6 +76,7 @@ class DltTableDataAccess(private val dataSource: DataSource) { ): List { val list = mutableListOf() +/* database .from(DltLog) .select() @@ -81,6 +84,56 @@ class DltTableDataAccess(private val dataSource: DataSource) { .orderBy(DltLog.id.asc()) .limit(offset, limit) .mapTo(list) { row -> DltLog.createEntity(row) } +*/ + + val sql = database + .from(DltLog) + .select( + DltLog.id, + DltLog.timestampSeconds, DltLog.timestampNanos, + DltLog.ecuId, + DltLog.appId, + DltLog.contextId, + DltLog.timestampHeader, + DltLog.sessionId, + DltLog.messageType, + DltLog.message, + ) + .whereWithOrConditions { list -> list.addAll(sqlClausesOr) } + .orderBy(DltLog.id.asc()) + .limit(offset, limit) + + database.useConnection { connection -> + val expr = database.formatExpression(sql.expression) + connection.prepareStatement(expr.first).use { stmt -> + val params = expr.second + params.forEachIndexed { index, param -> + when (param.sqlType) { + VarcharSqlType -> stmt.setString(index + 1, param.value as String?) + IntSqlType -> stmt.setInt(index + 1, param.value as Int) + LongSqlType -> stmt.setLong(index + 1, param.value as Long) + } + } + stmt.executeQuery().use { rs -> + while (rs.next()) { + list.add( + DltMessageDto( + id = rs.getLong(1), + timestamp = Instant.ofEpochSecond(rs.getLong(2), rs.getLong(3)), + ecuId = rs.getString(4) ?: "", + appId = rs.getString(5) ?: "", + contextId = rs.getString(6) ?: "", + timestampHeader = rs.getInt(7).toUInt() ?: 0u, + sessionId = rs.getInt(8), + messageType = MessageTypeInfo.valueOf(rs.getString(9)!!), + message = rs.getString(10) ?: "", + ) + ) + } + } + } + + } return list } diff --git a/dlt-filter-app/src/main/kotlin/dltfilterapp/DltFilterApp.kt b/dlt-filter-app/src/main/kotlin/dltfilterapp/DltFilterApp.kt index d8f3c9c..377a3cc 100644 --- a/dlt-filter-app/src/main/kotlin/dltfilterapp/DltFilterApp.kt +++ b/dlt-filter-app/src/main/kotlin/dltfilterapp/DltFilterApp.kt @@ -236,7 +236,7 @@ class DltFilterApp : JFrame("dlt-filter") { RandomAccessFile(destinationFile, "rw").use { randomAccessFile -> val bb = ByteBuffer.allocate(100_000) - DltMessageParser.parseFileWithCallback(file.toPath()).forEach { status -> + DltMessageParser.parseFile(file.toPath()).forEach { status -> if (status.filePosition != null && status.fileSize != null) { percent = (status.filePosition!!.toFloat() / status.fileSize!!.toFloat()) * 100 } else if (percent == 0f) { diff --git a/dlt-filter-app/src/main/kotlin/dltfilterapp/MainCli.kt b/dlt-filter-app/src/main/kotlin/dltfilterapp/MainCli.kt index 726ef22..90098f5 100644 --- a/dlt-filter-app/src/main/kotlin/dltfilterapp/MainCli.kt +++ b/dlt-filter-app/src/main/kotlin/dltfilterapp/MainCli.kt @@ -19,7 +19,7 @@ fun main(args: Array) { val appIds = args[1].split(',', ';').toSet() val appIdsInts = appIds.map { it.asIntValue() } var counter = 0 - DltMessageParser.parseFileWithCallback(path).forEach { status -> + DltMessageParser.parseFile(path).forEach { status -> counter++ val msg = status.dltMessage if (msg is DltMessageV1) {