Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(queue): fixes bug in SqlQueue doContainsMessage to handle multiple batches #4184

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -186,8 +186,7 @@ class SqlQueue(
}
}

private fun doContainsMessage(predicate: (Message) -> Boolean): Boolean {
val batchSize = 100
fun doContainsMessage(predicate: (Message) -> Boolean, batchSize: Int = 100): Boolean {
var found = false
var lastId = "0"

Expand All @@ -201,7 +200,7 @@ class SqlQueue(
.intoResultSet()
}

while (!found && rs.next()) {
while (!found && rs.row < batchSize && rs.next()) {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying problem here is a little subtle. The bug appears to be the result of the ResultSet.next() overshooting the cursor at the end of searching a batch.

The resulting ResultSet.row() check below never equals the expected batchSize and so no further batches are queried when they should.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This whole bit of code seems like it could do with some restructuring. It's half-way between native JDBC and JOOQ and makes it hard to reason about because it's mixing concepts.

Given that JOOQ fetches the entire result set in to memory anyway, I don't see a reason to not just use its Result object directly and break out of the search if a matching message is found or the result set is empty.

Maybe something like?

    do {
      val rs: Result<Record3<Any, Any, Any>> = withRetry(READ) {
        jooq.select(idField, fingerprintField, bodyField)
          .from(messagesTable)
          .where(idField.gt(lastId))
          .limit(batchSize)
          .fetch()
      }

      rs.forEach { record ->
        val body = record.getValue("body", String::class.java)
        try {
          if (predicate.invoke(mapper.readValue(body))) return true
        } catch (e: Exception) {
          log.error("Failed reading message with fingerprint: ${record.getValue("fingerprint", String::class.java)} message: $body", e)
        }
        lastId = record.getValue("id", String::class.java)
      }
    } while (rs.isNotEmpty)

    return false

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That could be tidied up further by adding some types to the field declarations (idField etc), but that would be a bigger change.

try {
found = predicate.invoke(mapper.readValue(rs.getString("body")))
} catch (e: Exception) {
Expand Down
Expand Up @@ -15,6 +15,12 @@ import com.netflix.spinnaker.q.TestMessage
import com.netflix.spinnaker.q.metrics.EventPublisher
import com.netflix.spinnaker.q.metrics.MonitorableQueueTest
import com.netflix.spinnaker.q.metrics.QueueEvent
import com.netflix.spinnaker.time.MutableClock
import com.nhaarman.mockito_kotlin.mock
import org.assertj.core.api.Assertions.assertThat
import org.junit.jupiter.api.AfterEach
import org.junit.jupiter.api.Test
import org.junit.jupiter.api.BeforeEach
import java.time.Clock
import java.time.Duration
import java.util.Optional
Expand Down Expand Up @@ -78,3 +84,50 @@ private val retryPolicy: RetryProperties = RetryProperties(
maxRetries = 1,
backoffMs = 10 // minimum allowed
)

class SqlQueueSpecificTests {
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Didn't want to come empty handed without tests. However I could not find a nice place for SqlQueue specific test implementation.

Open to suggestions on what to do with these - or simply remove them if this fix doesn't warrant the additional testing overhead.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the tests identify an edge case I think they definitely need to be added. Is there a way to add them without changing the visibility of the doContainsMessage() function?

private val clock = MutableClock()
private val deadMessageHandler: DeadMessageCallback = mock()
private val publisher: EventPublisher = mock()
private var queue: SqlQueue? = null

@BeforeEach
fun setup() {
queue = createQueue(clock, deadMessageHandler, publisher)
}

@AfterEach
fun cleanup() {
cleanupCallback()
}

@Test
fun `doContainsMessage works with no messages present`() {
assertThat(doContainsMessagePayload("test")).isFalse
}

@Test
fun `doContainsMessage works with a single batch`() {
val batchSize = 5
pushTestMessages(batchSize)
assertThat(doContainsMessagePayload("${batchSize-1}", batchSize)).isTrue
assertThat(doContainsMessagePayload("", batchSize)).isFalse
}

@Test
fun `doContainsMessage handles multiple batches during search`() {
val batchSize = 5
pushTestMessages(batchSize * 2)
assertThat(doContainsMessagePayload("${batchSize+1}", batchSize)).isTrue
assertThat(doContainsMessagePayload("", batchSize)).isFalse
}

private fun pushTestMessages(numberOfMessages: Int) {
for (i in 1 .. numberOfMessages) {
queue?.push(TestMessage(i.toString()))
}
}

private fun doContainsMessagePayload(payload: String, batchSize: Int = 5): Boolean? = queue?.doContainsMessage(
{ message -> message is TestMessage && message.payload == payload }, batchSize)
}