diff --git a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt index b8aa7512e0..39c8d3e84d 100644 --- a/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt +++ b/keiko-sql/src/main/kotlin/com/netflix/spinnaker/q/sql/SqlQueue.kt @@ -186,8 +186,7 @@ class SqlQueue( } } - private fun doContainsMessage(predicate: (Message) -> Boolean): Boolean { - val batchSize = 100 + fun doContainsMessage(predicate: (Message) -> Boolean, batchSize: Int = 100): Boolean { var found = false var lastId = "0" @@ -201,7 +200,7 @@ class SqlQueue( .intoResultSet() } - while (!found && rs.next()) { + while (!found && rs.row < batchSize && rs.next()) { try { found = predicate.invoke(mapper.readValue(rs.getString("body"))) } catch (e: Exception) { diff --git a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt index 346a270171..f9377892e9 100644 --- a/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt +++ b/keiko-sql/src/test/kotlin/com/netflix/spinnaker/q/sql/SqlQueueTest.kt @@ -15,6 +15,12 @@ import com.netflix.spinnaker.q.TestMessage import com.netflix.spinnaker.q.metrics.EventPublisher import com.netflix.spinnaker.q.metrics.MonitorableQueueTest import com.netflix.spinnaker.q.metrics.QueueEvent +import com.netflix.spinnaker.time.MutableClock +import com.nhaarman.mockito_kotlin.mock +import org.assertj.core.api.Assertions.assertThat +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.BeforeEach import java.time.Clock import java.time.Duration import java.util.Optional @@ -78,3 +84,50 @@ private val retryPolicy: RetryProperties = RetryProperties( maxRetries = 1, backoffMs = 10 // minimum allowed ) + +class SqlQueueSpecificTests { + private val clock = MutableClock() + private val deadMessageHandler: DeadMessageCallback = mock() + private val publisher: EventPublisher = mock() + private var queue: SqlQueue? = null + + @BeforeEach + fun setup() { + queue = createQueue(clock, deadMessageHandler, publisher) + } + + @AfterEach + fun cleanup() { + cleanupCallback() + } + + @Test + fun `doContainsMessage works with no messages present`() { + assertThat(doContainsMessagePayload("test")).isFalse + } + + @Test + fun `doContainsMessage works with a single batch`() { + val batchSize = 5 + pushTestMessages(batchSize) + assertThat(doContainsMessagePayload("${batchSize-1}", batchSize)).isTrue + assertThat(doContainsMessagePayload("", batchSize)).isFalse + } + + @Test + fun `doContainsMessage handles multiple batches during search`() { + val batchSize = 5 + pushTestMessages(batchSize * 2) + assertThat(doContainsMessagePayload("${batchSize+1}", batchSize)).isTrue + assertThat(doContainsMessagePayload("", batchSize)).isFalse + } + + private fun pushTestMessages(numberOfMessages: Int) { + for (i in 1 .. numberOfMessages) { + queue?.push(TestMessage(i.toString())) + } + } + + private fun doContainsMessagePayload(payload: String, batchSize: Int = 5): Boolean? = queue?.doContainsMessage( + { message -> message is TestMessage && message.payload == payload }, batchSize) +}