Skip to content

Commit

Permalink
Parameters in cron triggers (#801)
Browse files Browse the repository at this point in the history
* feat(cron): pass along cron trigger parameters to orca

Triggers support a parameters map, but for cron triggers it gets lost along the way and never makes it to orca, meaning that only default parameter values can be used for cron triggers.

To work around this, users have been duplicating pipelines with different default values and different triggers. This does not scale for obvious reasons and is just generally cumbersome.
  • Loading branch information
dreynaud committed Mar 9, 2020
1 parent b0e1fc2 commit 51ebe94
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.springframework.context.ApplicationListener;

/**
* A component that starts doing something when the instance is up in discovery and stops doing that
* A component that starts doing something when the instance is up in Eureka and stops doing that
* thing when it goes down.
*/
public interface DiscoveryActivated extends ApplicationListener<RemoteStatusChangedEvent> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Preconditions;
import com.netflix.spectator.api.Registry;
import com.netflix.spinnaker.echo.artifacts.MessageArtifactTranslator;
import com.netflix.spinnaker.echo.config.AmazonPubsubProperties;
Expand Down Expand Up @@ -78,9 +79,8 @@ public class SQSSubscriberProvider implements DiscoveryActivated {

@PostConstruct
public void start() {
if (properties == null) {
return;
}
Preconditions.checkNotNull(
properties, "Can't initialize SQSSubscriberProvider with null properties");

ExecutorService executorService =
Executors.newFixedThreadPool(properties.getSubscriptions().size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ public static GooglePubsubPublisher buildPublisher(
.build();
publisher.setPublisher(p);
} catch (IOException ioe) {
log.error("Could not create Google Pubsub Publishers: {}", ioe);
log.error("Could not create Google Pubsub Publishers", ioe);
}

return publisher;
Expand All @@ -95,7 +95,7 @@ public void publishEvent(Event event) {
try {
jsonPayload = mapper.writeValueAsString(event);
} catch (JsonProcessingException jpe) {
log.error("Could not serialize event message: {}", jpe);
log.error("Could not serialize event message", jpe);
return;
}

Expand Down Expand Up @@ -146,7 +146,7 @@ public void publish(Map payload, Map<String, String> attributes) {
try {
jsonPayload = mapper.writeValueAsString(payload);
} catch (JsonProcessingException jpe) {
log.error("Could not serialize event message: {}", jpe);
log.error("Could not serialize event message", jpe);
return;
}
publish(jsonPayload, attributes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import static org.quartz.TriggerBuilder.newTrigger
class TriggerConverter {
public static final String JOB_ID = "Pipeline Trigger"

static Map<String, String> toParamMap(Trigger trigger, String timeZoneId) {
static Map<String, Object> toParamMap(Trigger trigger, String timeZoneId) {
def params = [
id : trigger.parent.id,
application : trigger.parent.application,
Expand All @@ -41,7 +41,8 @@ class TriggerConverter {
triggerCronExpression: trigger.cronExpression,
triggerTimeZoneId : timeZoneId,
triggerRebake : Boolean.toString(trigger.rebake),
triggerEnabled : "true"
triggerEnabled : "true",
triggerParameters : trigger.parameters
]

if (trigger.runAsUser) {
Expand Down Expand Up @@ -77,22 +78,23 @@ class TriggerConverter {
}

static Pipeline toPipeline(PipelineCache pipelineCache, Map<String, Object> parameters) {
def existingPipeline = pipelineCache.getPipelinesSync().find { it.id == parameters.id }
if (!existingPipeline) {
throw new IllegalStateException("No pipeline found (id: ${parameters.id})")
}

def triggerBuilder = Trigger
.builder()
.enabled(Boolean.parseBoolean(parameters.triggerEnabled))
.rebake(Boolean.parseBoolean(parameters.triggerRebake))
.id(parameters.triggerId)
.enabled(Boolean.parseBoolean(parameters.triggerEnabled as String))
.rebake(Boolean.parseBoolean(parameters.triggerRebake as String))
.id(parameters.triggerId as String)
.type(Trigger.Type.CRON.toString())
.eventId(UUID.randomUUID().toString())
.cronExpression(parameters.triggerCronExpression)
.cronExpression(parameters.triggerCronExpression as String)
.parameters(parameters.triggerParameters as Map)

if (parameters.runAsUser) {
triggerBuilder.runAsUser(parameters.runAsUser)
}

def existingPipeline = pipelineCache.getPipelinesSync().find { it.id == parameters.id }
if (!existingPipeline) {
throw new IllegalStateException("No pipeline found (id: ${parameters.id})")
triggerBuilder.runAsUser(parameters.runAsUser as String)
}

return existingPipeline.withTrigger(triggerBuilder.build())
Expand All @@ -101,15 +103,9 @@ class TriggerConverter {
static boolean isInSync(CronTrigger trigger, Trigger pipelineTrigger, TimeZone timeZoneId) {
CronTrigger other = toQuartzTrigger(pipelineTrigger, timeZoneId) as CronTrigger

boolean cronExpressionMismatch = (trigger.cronExpression != other.cronExpression)
boolean timezoneMismatch = !trigger.timeZone.hasSameRules(other.timeZone)
boolean runAsUserMismatch =
(trigger.jobDataMap.getString("runAsUser") != other.jobDataMap.getString("runAsUser"))

if (cronExpressionMismatch || runAsUserMismatch || timezoneMismatch) {
return false
}

return true
return (trigger.cronExpression == other.cronExpression
&& trigger.timeZone.hasSameRules(other.timeZone)
&& trigger.jobDataMap.getString("runAsUser") == other.jobDataMap.getString("runAsUser")
&& trigger.jobDataMap.getWrappedMap().triggerParameters == other.jobDataMap.getWrappedMap().triggerParameters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import com.netflix.spinnaker.echo.model.Trigger
import com.netflix.spinnaker.echo.pipelinetriggers.PipelineCache
import org.quartz.CronScheduleBuilder
import org.quartz.CronTrigger
import org.quartz.JobDataMap
import spock.lang.Shared
import spock.lang.Specification
import spock.lang.Unroll
Expand All @@ -47,6 +48,7 @@ class TriggerConverterSpec extends Specification {
.rebake(triggerRebake)
.runAsUser("mr.captain")
.parent(pipeline)
.parameters(triggerParameters)
.build()

when:
Expand All @@ -60,10 +62,12 @@ class TriggerConverterSpec extends Specification {
parameters.triggerTimeZoneId == 'America/New_York'
parameters.triggerRebake == Boolean.toString(trigger.rebake)
parameters.runAsUser == 'mr.captain'
parameters.triggerParameters == triggerParameters

where:
triggerId << ['123-456', null]
triggerRebake << [true, false]
triggerParameters << [null, [param1: 'value1', param2: 42]]
}

@Unroll
Expand All @@ -79,7 +83,8 @@ class TriggerConverterSpec extends Specification {
triggerType: 'cron',
triggerCronExpression: '* 0/30 * * * ? *',
triggerEnabled: "true",
triggerRebake: triggerRebake
triggerRebake: triggerRebake,
triggerParameters: ['param1': 42]
]

when:
Expand All @@ -94,6 +99,7 @@ class TriggerConverterSpec extends Specification {
pipelineWithTrigger.trigger.cronExpression == parameters.triggerCronExpression
pipelineWithTrigger.trigger.enabled == Boolean.valueOf(parameters.triggerEnabled)
pipelineWithTrigger.trigger.rebake == Boolean.valueOf(parameters.triggerRebake)
pipelineWithTrigger.trigger.parameters == parameters.triggerParameters

where:
triggerRebake << ['true', 'false']
Expand Down Expand Up @@ -131,30 +137,38 @@ class TriggerConverterSpec extends Specification {
}

@Unroll
void 'isInSync() should return true if cronExpression, timezone of the trigger, and runAsUser match the ActionInstance'() {
void 'isInSync() should return true if cronExpression, timezone of the trigger, runAsUser and trigger parameters match the ActionInstance'() {
setup:
Trigger pipelineTrigger = Trigger.builder()
.id("id1")
.parent(pipeline)
.type(Trigger.Type.CRON.toString())
.cronExpression('* 0/30 * * * ? *')
.runAsUser("batman")
.parameters([param: 'value'])
.build()

org.quartz.Trigger scheduleTrigger = org.quartz.TriggerBuilder.newTrigger()
.withIdentity("ignored", null)
.withSchedule(CronScheduleBuilder.cronSchedule(pipelineTrigger.cronExpression)
.inTimeZone(TimeZone.getTimeZone(actionInstanceTimeZoneId)))
.usingJobData("runAsUser", runAsUser)
.usingJobData(new JobDataMap([
runAsUser: runAsUser,
triggerParameters: parameters
]))
.build()

expect:
isInSync(scheduleTrigger, pipelineTrigger, TimeZone.getTimeZone(currentTimeZoneId)) == expectedInSync
isInSync(scheduleTrigger, pipelineTrigger, TimeZone.getTimeZone('America/New_York')) == expectedInSync

where:
actionInstanceTimeZoneId | currentTimeZoneId | runAsUser | expectedInSync
'America/New_York' | 'America/New_York' | 'batman' | true
'America/Los_Angeles' | 'America/New_York' | 'batman' | false
'' | 'America/New_York' | 'batman' | false
'America/New_York' | 'America/New_York' | 'robin' | false
actionInstanceTimeZoneId | runAsUser | parameters | expectedInSync
'America/New_York' | 'batman' | [param: 'value'] | true
'America/New_York' | 'batman' | [:] | false
'America/New_York' | 'batman' | null | false
'America/New_York' | 'batman' | [param: 'other'] | false
'America/Los_Angeles' | 'batman' | [param: 'value'] | false
'' | 'batman' | [param: 'value'] | false
'America/New_York' | 'robin' | [param: 'value'] | false
}
}

0 comments on commit 51ebe94

Please sign in to comment.