Skip to content

Commit

Permalink
Fix/manual judgment concurrent execution (#4410)
Browse files Browse the repository at this point in the history
* fix(manual_judgment): added distributed locking to ensure sequential updates to execution's stages

* fix(manual_judgement): fixed test

* fix(blue-green-deploy): added docs and reformatted files

* fix(blue-green-deploy): bumped shedlock dependency version to 4.44.0

---------

Co-authored-by: Cameron Motevasselani <[email protected]>
Co-authored-by: ovidiupopa07 <[email protected]>
  • Loading branch information
3 people committed Mar 28, 2023
1 parent 9a5878e commit 706ffb4
Show file tree
Hide file tree
Showing 20 changed files with 1,214 additions and 243 deletions.
2 changes: 2 additions & 0 deletions orca-core/orca-core.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ dependencies {
implementation("com.jayway.jsonpath:json-path:2.2.0")
implementation("org.yaml:snakeyaml")
implementation("org.codehaus.groovy:groovy")
implementation("net.javacrumbs.shedlock:shedlock-spring:4.44.0")
implementation("net.javacrumbs.shedlock:shedlock-provider-jdbc-template:4.44.0")

compileOnly("org.projectlombok:lombok")
annotationProcessor("org.projectlombok:lombok")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.netflix.spinnaker.orca.libdiffs.ComparableLooseVersion;
import com.netflix.spinnaker.orca.libdiffs.DefaultComparableLooseVersion;
import com.netflix.spinnaker.orca.listeners.*;
import com.netflix.spinnaker.orca.lock.RetriableLock;
import com.netflix.spinnaker.orca.pipeline.CompoundExecutionOperator;
import com.netflix.spinnaker.orca.pipeline.DefaultStageDefinitionBuilderFactory;
import com.netflix.spinnaker.orca.pipeline.ExecutionRunner;
Expand Down Expand Up @@ -83,6 +84,7 @@
"com.netflix.spinnaker.orca.preprocessors",
"com.netflix.spinnaker.orca.telemetry",
"com.netflix.spinnaker.orca.notifications.scheduling",
"com.netflix.spinnaker.orca.lock"
})
@Import({
PreprocessorConfiguration.class,
Expand Down Expand Up @@ -262,7 +264,10 @@ public ForceExecutionCancellationCommand forceExecutionCancellationCommand(

@Bean
public CompoundExecutionOperator compoundExecutionOperator(
ExecutionRepository repository, ExecutionRunner runner, RetrySupport retrySupport) {
return new CompoundExecutionOperator(repository, runner, retrySupport);
ExecutionRepository repository,
ExecutionRunner runner,
RetrySupport retrySupport,
RetriableLock retriableLock) {
return new CompoundExecutionOperator(repository, runner, retrySupport, retriableLock);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
/*
* Copyright 2023 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.lock

import com.netflix.spinnaker.kork.lock.LockManager
import com.netflix.spinnaker.kork.lock.LockManager.LockStatus.ACQUIRED
import net.javacrumbs.shedlock.core.LockConfiguration
import net.javacrumbs.shedlock.core.LockProvider
import net.javacrumbs.shedlock.core.SimpleLock
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Instant
import java.time.temporal.ChronoUnit
import java.util.*
import java.util.concurrent.Callable

/**
* Postpones the execution of an action until an external lock has been obtained
*/
interface RunOnLockAcquired {

/**
* Executes an action after lock identified by {@code keyName} was obtained
*
* @param action action to execute once lock is acquired
* @param keyName name of a lock
*
* @return result of attempting to acquire a lock
*/
fun execute(action: Runnable, keyName: String): RunOnLockResult<Void>

/**
* Executes an action after lock identified by {@code keyName} was obtained
*
* @param action action to execute once lock is acquired
* @param keyName name of a lock
*
* @return result of attempting to acquire a lock and result of action execution
*/
fun <R> execute(action: Callable<R>, keyName: String): RunOnLockResult<R?>

}

/**
* This is a container object that stores the result of attempting to acquire a lock and executing an action if the lock is obtained.
*/
data class RunOnLockResult<R>(
val lockAcquired: Boolean = false,
val actionExecuted: Boolean = false,
val exception: Exception? = null,
val result: R? = null
)

/**
* Implementation of {@code RunOnLockAcquired}. Delegates the locking attempt to LockProvider.
* Executes action until shedlock has been obtained
*/
class RunOnShedLockAcquired(
private val shedLockProvider: LockProvider
) : RunOnLockAcquired {

private val log = LoggerFactory.getLogger(javaClass)
override fun execute(action: Runnable, keyName: String): RunOnLockResult<Void> {
val lockOpt = this.getLock(keyName)
if (lockOpt.isEmpty) {
log.error("Failed to acquire shedlock for key: {}", keyName)
return RunOnLockResult(lockAcquired = false)
}

return try {
log.debug("Executing action with a lock for key: {}", keyName)
action.run()
log.debug("Finished action execution with a lock for key: {}", keyName)
RunOnLockResult(lockAcquired = true, actionExecuted = true)
} catch (e: Exception) {
log.error("An exception occurred while executing action with a lock for key: {}", keyName)
RunOnLockResult(lockAcquired = true, exception = e)
} finally {
lockOpt.get().unlock()
log.debug("Released shedlock for key {}", keyName)
}
}

override fun <R> execute(action: Callable<R>, keyName: String): RunOnLockResult<R?> {
val lockOpt = this.getLock(keyName)
if (lockOpt.isEmpty) {
log.error("Failed to acquire shedlock for key: {}", keyName)
return RunOnLockResult(lockAcquired = false)
}

return try {
log.debug("Executing action with a lock for key: {}", keyName)
val callableResult = action.call()
log.debug("Finished action execution with a lock for key: {}", keyName)
RunOnLockResult(lockAcquired = true, actionExecuted = true, result = callableResult)

} catch (e: Exception) {
log.error("An exception occurred while executing action with a lock for key: {}", keyName)
RunOnLockResult(lockAcquired = true, exception = e)
} finally {
lockOpt.get().unlock()
log.debug("Released shedlock for key {}", keyName)
}
}

private fun getLock(keyName: String): Optional<SimpleLock> {
try {
log.debug("Attempt to acquire shedlock for key: {}", keyName)
return shedLockProvider.lock(LockConfiguration(Instant.now(), keyName, Duration.ofSeconds(1), Duration.ofMillis(200)))
} catch (e: Exception) {
log.error("An exception occurred during an attempt to acquire shedlock for key: {}", keyName)
log.error(e.message)
throw e
}
}

}


/**
* Implementation of {@code RunOnLockAcquired}. Delegates the locking attempt to LockManager.
* Executes action until redis lock has been obtained
*/
class RunOnRedisLockAcquired(
private val lockManager: LockManager
) : RunOnLockAcquired {

private fun lockOptions(name: String) = LockManager.LockOptions()
.withLockName(name)
.withMaximumLockDuration(Duration.ofSeconds(1L))

override fun execute(action: Runnable, keyName: String): RunOnLockResult<Void> {
return try {
val acquireLock = lockManager.acquireLock(lockOptions(keyName), action)
if (!acquireLock.lockStatus.equals(ACQUIRED)) {
return RunOnLockResult(lockAcquired = false)
}

RunOnLockResult(
lockAcquired = true,
actionExecuted = true,
result = acquireLock.onLockAcquiredCallbackResult
)
} catch (e: Exception) {
RunOnLockResult(lockAcquired = true, exception = e)
}
}

override fun <R> execute(action: Callable<R>, keyName: String): RunOnLockResult<R?> {
return try {
val acquireLock = lockManager.acquireLock(lockOptions(keyName), action)
if (!acquireLock.lockStatus.equals(ACQUIRED)) {
return RunOnLockResult(lockAcquired = false)
}

RunOnLockResult(
lockAcquired = true,
actionExecuted = true,
result = acquireLock.onLockAcquiredCallbackResult
)
} catch (e: Exception) {
RunOnLockResult(lockAcquired = true, exception = e)
}
}

}

/**
* Implementation of {@code RunOnLockAcquired}. Doesn't try to obtain any lock, executes action right away
*/
class NoOpRunOnLockAcquired : RunOnLockAcquired {

private val log = LoggerFactory.getLogger(javaClass)
override fun execute(action: Runnable, keyName: String): RunOnLockResult<Void> {
return try {
log.debug("Executing action with no locking for key: {}", keyName)
action.run()
log.debug("Execution with no locking for key: {} successful", keyName)
RunOnLockResult(lockAcquired = true, actionExecuted = true)
} catch (e: Exception) {
log.error("An exception was thrown while executing action with no locking for key: {}", keyName)
log.error(e.message)
RunOnLockResult(exception = e)
}
}

override fun <R> execute(action: Callable<R>, keyName: String): RunOnLockResult<R?> {
return try {
RunOnLockResult(lockAcquired = true, actionExecuted = true, result = action.call())
} catch (e: Exception) {
RunOnLockResult(exception = e)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright 2023 Armory, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.netflix.spinnaker.orca.lock

import com.netflix.spinnaker.kork.core.RetrySupport
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import java.util.*

@Configuration
class LockConfig {

@Bean
@ConditionalOnMissingBean(RunOnLockAcquired::class)
fun noOpLocking(): RunOnLockAcquired {
return NoOpRunOnLockAcquired()
}

@Bean
fun retriableLock(
runOnLockAcquired: RunOnLockAcquired,
retrySupport: RetrySupport
): RetriableLock {
return RetriableLock(runOnLockAcquired, retrySupport)
}


}

0 comments on commit 706ffb4

Please sign in to comment.