From 5e746a66118934237c93272c9e4bea1c7d349530 Mon Sep 17 00:00:00 2001 From: Kirill Batalin Date: Mon, 11 Mar 2024 21:59:09 +0000 Subject: [PATCH] fix(queue): Fix `ZombieExecutionCheckingAgent` to handle queues with more than 100 items (#4648) `SqlQueue#doContainsMessage` doesn't process more than 1 batch because of an incorrect loop inside. When the last element in the batch is processed (`ResultSet#next` returns `false`), the following invocation of `ResultSet#getRow` will return 0. No matter how many rows were processed before. Basically, this PR is just a copy of https://github.com/spinnaker/orca/pull/4184 with addressed comments. But https://github.com/spinnaker/orca/pull/4184 is abandoned so opened this one. Kudos to Ivor for the original PR Co-authored-by: Jason (cherry picked from commit 0a52909dd408017a16331ffbf83550e7c5f3e0f1) --- .../com/netflix/spinnaker/q/sql/SqlQueue.kt | 26 ++++----- .../netflix/spinnaker/q/sql/SqlQueueTest.kt | 58 ++++++++++++++++++- 2 files changed, 69 insertions(+), 15 deletions(-) diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index b65e8b16f2..614bebfd75 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -77,7 +77,8 @@ class SqlQueue( override val publisher: EventPublisher, private val sqlRetryProperties: SqlRetryProperties, private val ULID: ULID = ULID(), - private val poolName: String = "default" + private val poolName: String = "default", + private val containsMessageBatchSize: Int = 100, ) : MonitorableQueue { companion object { @@ -187,33 +188,32 @@ class SqlQueue( } private fun doContainsMessage(predicate: (Message) -> Boolean): Boolean { - val batchSize = 100 + val batchSize = containsMessageBatchSize var found = false var lastId = "0" do { - val rs: ResultSet = withRetry(READ) { + val rs = withRetry(READ) { jooq.select(idField, fingerprintField, bodyField) .from(messagesTable) .where(idField.gt(lastId)) + .orderBy(idField.asc()) .limit(batchSize) .fetch() - .intoResultSet() } - while (!found && rs.next()) { + val rsIterator = rs.iterator() + while (!found && rsIterator.hasNext()) { + val record = rsIterator.next() + val body = record[bodyField, String::class.java] try { - found = predicate.invoke(mapper.readValue(rs.getString("body"))) + found = predicate.invoke(mapper.readValue(body)) } catch (e: Exception) { - log.error( - "Failed reading message with fingerprint: ${rs.getString("fingerprint")} " + - "message: ${rs.getString("body")}", - e - ) + log.error("Failed reading message with fingerprint: ${record[fingerprintField, String::class.java]} message: $body", e) } - lastId = rs.getString("id") + lastId = record[idField, String::class.java] } - } while (!found && rs.row == batchSize) + } while (!found && rs.isNotEmpty) return found } diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt index 2d5f3235c0..58564687fb 100644 --- a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt +++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt @@ -15,6 +15,12 @@ import com.netflix.spinnaker.q.TestMessage import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.metrics.MonitorableQueueTest import com.netflix.spinnaker.q.metrics.QueueEvent +import com.netflix.spinnaker.time.MutableClock +import com.nhaarman.mockito_kotlin.mock +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.BeforeEach import java.time.Clock import java.time.Duration import java.util.Optional @@ -37,7 +43,8 @@ private val createQueueNoPublisher = { clock: Clock, private fun createQueue(clock: Clock, deadLetterCallback: DeadMessageCallback, - publisher: EventPublisher?): SqlQueue { + publisher: EventPublisher?, + containsMessageBatchSize: Int = 5): SqlQueue { return SqlQueue( queueName = "test", schemaVersion = 1, @@ -66,7 +73,8 @@ private fun createQueue(clock: Clock, sqlRetryProperties = SqlRetryProperties( transactions = retryPolicy, reads = retryPolicy - ) + ), + containsMessageBatchSize = containsMessageBatchSize, ) } @@ -78,3 +86,49 @@ private val retryPolicy: RetryProperties = RetryProperties( maxRetries = 1, backoffMs = 10 // minimum allowed ) + +class SqlQueueSpecificTests { + private val batchSize = 5 + private val clock = MutableClock() + private val deadMessageHandler: DeadMessageCallback = mock() + private val publisher: EventPublisher = mock() + private var queue: SqlQueue? = null + + @BeforeEach + fun setup() { + queue = createQueue(clock, deadMessageHandler, publisher, batchSize) + } + + @AfterEach + fun cleanup() { + cleanupCallback() + } + + @Test + fun `doContainsMessage works with no messages present`() { + assertThat(doContainsMessagePayload("test")).isFalse + } + + @Test + fun `doContainsMessage works with a single batch`() { + pushTestMessages(batchSize) + assertThat(doContainsMessagePayload("${batchSize-1}")).isTrue + assertThat(doContainsMessagePayload("")).isFalse + } + + @Test + fun `doContainsMessage handles multiple batches during search`() { + pushTestMessages(batchSize * 2) + assertThat(doContainsMessagePayload("${batchSize+1}")).isTrue + assertThat(doContainsMessagePayload("")).isFalse + } + + private fun pushTestMessages(numberOfMessages: Int) { + for (i in 1 .. numberOfMessages) { + queue?.push(TestMessage(i.toString())) + } + } + + private fun doContainsMessagePayload(payload: String): Boolean? = + queue?.containsMessage { message -> message is TestMessage && message.payload == payload } +}