Skip to content

Commit

Permalink
fix(queue): fix ability to cancel a zombied execution (#4473)
Browse files Browse the repository at this point in the history
* fix(queue): fix ability to cancel a zombied execution

* fix(queue): undo unintentional change

* fix(queue): add more logging

---------

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
mattgogerly and mergify[bot] committed Jun 20, 2023
1 parent 8294241 commit 56c7206
Show file tree
Hide file tree
Showing 8 changed files with 88 additions and 10 deletions.
7 changes: 6 additions & 1 deletion keiko-core/src/main/kotlin/com/netflix/spinnaker/q/Queue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ interface Queue {
fun push(message: Message, delay: TemporalAmount): Unit

/**
* Update [message] if it exists for immediate delivery.
* Update [message] if it exists for immediate delivery. No-op if the [message] does not exist.
*/
fun reschedule(message: Message): Unit = reschedule(message, ZERO)

Expand All @@ -87,6 +87,11 @@ interface Queue {
*/
fun retry() {}

/**
* Used for testing zombie executions. Wipes all messages from the queue.
*/
fun clear() {}

/**
* The expired time after which un-acknowledged messages will be retried.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,10 @@ class InMemoryQueue(
}
}

override fun clear() {
queue.removeAll { true }
}

override fun readState() =
QueueState(
depth = queue.size,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,8 +154,13 @@ class RedisQueue(
log.debug("Re-scheduling message: $message, fingerprint: $fingerprint to deliver in $delay")
val status: Long = redis.zadd(queueKey, score(delay), fingerprint, zAddParams().xx())
if (status.toInt() == 1) {
log.debug("Rescheduled message: $message, fingerprint: $fingerprint to deliver in $delay")
fire(MessageRescheduled(message))
} else {
log.warn(
"Failed to reschedule message: $message, fingerprint: $fingerprint, not found " +
"on queue"
)
fire(MessageNotFound(message))
}
}
Expand Down Expand Up @@ -229,6 +234,12 @@ class RedisQueue(
}
}

override fun clear() {
pool.resource.use { redis ->
redis.del(messagesKey)
}
}

override fun readState(): QueueState =
pool.resource.use { redis ->
redis.multi {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -764,6 +764,14 @@ class SqlQueue(
fire(RetryPolled)
}

override fun clear() {
withPool(poolName) {
withRetry(WRITE) {
jooq.deleteFrom(messagesTable).execute()
}
}
}

@Scheduled(fixedDelayString = "\${queue.cleanup.frequency.ms:2000}")
fun cleanupMessages() {
withPool(poolName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ class ExecutionLatch(private val predicate: Predicate<ExecutionComplete>) :
fun await() = latch.await(10, TimeUnit.SECONDS)
}

fun ConfigurableApplicationContext.run(execution: PipelineExecution, launcher: (PipelineExecution) -> Unit) {
val latch = ExecutionLatch(
Predicate {
it.executionId == execution.id
}
)
addApplicationListener(latch)
launcher.invoke(execution)

Thread.sleep(500)
}

fun ConfigurableApplicationContext.runToCompletion(execution: PipelineExecution, launcher: (PipelineExecution) -> Unit, repository: ExecutionRepository) {
val latch = ExecutionLatch(
Predicate {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ import org.springframework.context.event.ApplicationEventMulticaster
import org.springframework.context.event.SimpleApplicationEventMulticaster
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor
import org.springframework.test.context.junit4.SpringRunner
import java.util.concurrent.TimeUnit
import java.util.function.Predicate

@SpringBootTest(
classes = [TestConfig::class],
Expand Down Expand Up @@ -849,6 +851,35 @@ abstract class QueueIntegrationTest {
}
}
}

@Test
fun `cancelling a zombied execution sets task, stage and execution statuses to CANCELED`() {
val pipeline = pipeline {
application = "spinnaker"
stage {
refId = "1"
type = "dummy"
}
}
repository.store(pipeline)

whenever(dummyTask.execute(any())) doAnswer {
TaskResult.RUNNING
}

context.run(pipeline, runner::start)

// simulate a zombie by clearing the queue
queue.clear()

context.runToCompletion(pipeline, { runner.cancel(it, "anonymous", null) }, repository)

repository.retrieve(PIPELINE, pipeline.id).apply {
assertThat(status).isEqualTo(CANCELED)
assertThat(stageByRef("1").status).isEqualTo(CANCELED)
assertThat(stageByRef("1").tasks.all { it.status == CANCELED })
}
}
}

@Configuration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import com.netflix.spinnaker.orca.q.RescheduleExecution
import com.netflix.spinnaker.orca.q.RunTask
import com.netflix.spinnaker.q.Queue
import org.springframework.stereotype.Component
import java.time.Duration

@Component
class RescheduleExecutionHandler(
Expand All @@ -43,14 +44,14 @@ class RescheduleExecutionHandler(
stage.tasks
.filter { it.status == ExecutionStatus.RUNNING }
.forEach {
queue.reschedule(
RunTask(
message,
stage.id,
it.id,
taskResolver.getTaskClass(it.implementingClass)
)
val taskMessage = RunTask(
message,
stage.id,
it.id,
taskResolver.getTaskClass(it.implementingClass)
)
queue.ensure(taskMessage, Duration.ZERO)
queue.reschedule(taskMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.jetbrains.spek.api.dsl.describe
import org.jetbrains.spek.api.dsl.it
import org.jetbrains.spek.api.lifecycle.CachingMode
import org.jetbrains.spek.subject.SubjectSpek
import java.time.Duration

object RescheduleExecutionHandlerTest : SubjectSpek<RescheduleExecutionHandler>({

Expand Down Expand Up @@ -101,8 +102,13 @@ object RescheduleExecutionHandlerTest : SubjectSpek<RescheduleExecutionHandler>(
val task4 = stage2a.taskById("4")
val task5 = stage2b.taskById("5")

verify(queue).reschedule(RunTask(message, stage2a.id, task4.id, Class.forName(task4.implementingClass) as Class<out Task>))
verify(queue).reschedule(RunTask(message, stage2b.id, task5.id, Class.forName(task5.implementingClass) as Class<out Task>))
val messageTask4 = RunTask(message, stage2a.id, task4.id, Class.forName(task4.implementingClass) as Class<out Task>)
val messageTask5 = RunTask(message, stage2b.id, task5.id, Class.forName(task5.implementingClass) as Class<out Task>)

verify(queue).ensure(messageTask4, Duration.ZERO)
verify(queue).reschedule(messageTask4)
verify(queue).ensure(messageTask5, Duration.ZERO)
verify(queue).reschedule(messageTask5)
verifyNoMoreInteractions(queue)
}
}
Expand Down

0 comments on commit 56c7206

Please sign in to comment.