Skip to content

Commit

Permalink
Implement agent reuse, toggled by a cluster profile config option
Browse files Browse the repository at this point in the history
  • Loading branch information
brandonvin authored and chadlwilson committed Jan 6, 2024
1 parent c737e63 commit a7e1761
Show file tree
Hide file tree
Showing 36 changed files with 867 additions and 399 deletions.
15 changes: 14 additions & 1 deletion src/main/java/cd/go/contrib/elasticagent/AgentInstances.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import cd.go.contrib.elasticagent.executors.ServerPingRequestExecutor;
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;

import java.util.Optional;
import java.util.function.Function;

/**
* Plugin implementors should implement these methods to interface to your cloud.
Expand All @@ -36,7 +38,7 @@ public interface AgentInstances<T> {
* @param pluginRequest the plugin request object
* @param consoleLogAppender
*/
T create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception;
Optional<T> requestCreateAgent(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) throws Exception;

/**
* This message is sent when the plugin needs to terminate the agent instance.
Expand Down Expand Up @@ -84,5 +86,16 @@ public interface AgentInstances<T> {
* @param agentId the elastic agent id
*/
T find(String agentId);

/**
* Atomically update the agent instance for the given <code>agentId</code>.
* <code>computeFn</code> is called with the current agent instance if it exists,
* or null if it doesn't exist. <code>computeFn</code> should return a new agent instance
* that represents its new state.
* @param agentId
* @param computeFn
* @return
*/
T updateAgent(String agentId, Function<T, T> computeFn);
}

181 changes: 137 additions & 44 deletions src/main/java/cd/go/contrib/elasticagent/KubernetesAgentInstances.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,25 @@

import cd.go.contrib.elasticagent.model.JobIdentifier;
import cd.go.contrib.elasticagent.requests.CreateAgentRequest;
import cd.go.contrib.elasticagent.KubernetesInstance.AgentState;
import cd.go.contrib.elasticagent.utils.Util;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodList;
import io.fabric8.kubernetes.client.KubernetesClient;

import java.net.SocketTimeoutException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Semaphore;
import java.util.function.Function;
import java.util.stream.Collectors;

import static cd.go.contrib.elasticagent.KubernetesPlugin.LOG;
import static java.text.MessageFormat.format;

public class KubernetesAgentInstances implements AgentInstances<KubernetesInstance> {
private final ConcurrentHashMap<String, KubernetesInstance> instances = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, KubernetesInstance> instances;
public Clock clock = Clock.DEFAULT;
final Semaphore semaphore = new Semaphore(0, true);

private KubernetesClientFactory factory;
private KubernetesInstanceFactory kubernetesInstanceFactory;
Expand All @@ -50,55 +50,127 @@ public KubernetesAgentInstances(KubernetesClientFactory factory) {
}

public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInstanceFactory kubernetesInstanceFactory) {
this(factory, kubernetesInstanceFactory, Collections.emptyMap());
}

public KubernetesAgentInstances(KubernetesClientFactory factory, KubernetesInstanceFactory kubernetesInstanceFactory, Map<String, KubernetesInstance> initialInstances) {
this.factory = factory;
this.kubernetesInstanceFactory = kubernetesInstanceFactory;
this.instances = new ConcurrentHashMap<>(initialInstances);
}

@Override
public KubernetesInstance create(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
final Integer maxAllowedContainers = settings.getMaxPendingPods();
public Optional<KubernetesInstance> requestCreateAgent(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {
final Integer maxAllowedPods = settings.getMaxPendingPods();
synchronized (instances) {
refreshAll(settings);
doWithLockOnSemaphore(new SetupSemaphore(maxAllowedContainers, instances, semaphore));
consoleLogAppender.accept("Waiting to create agent pod.");
if (semaphore.tryAcquire()) {
return createKubernetesInstance(request, settings, pluginRequest, consoleLogAppender);
if (instances.size() < maxAllowedPods) {
return requestCreateAgentHelper(request, settings, pluginRequest, consoleLogAppender);
} else {
String message = format("[Create Agent Request] The number of pending kubernetes pods is currently at the maximum permissible limit ({0}). Total kubernetes pods ({1}). Not creating any more containers.", maxAllowedContainers, instances.size());
String message = String.format("[Create Agent Request] The number of pending kubernetes pods is currently at the maximum permissible limit (%s). Total kubernetes pods (%s). Not creating any more pods.",
maxAllowedPods,
instances.size());
LOG.warn(message);
consoleLogAppender.accept(message);
return null;
return Optional.empty();
}
}
}

private void doWithLockOnSemaphore(Runnable runnable) {
synchronized (semaphore) {
runnable.run();
private List<KubernetesInstance> findPodsEligibleForReuse(CreateAgentRequest request) {
Long jobId = request.jobIdentifier().getJobId();
String jobElasticConfigHash = KubernetesInstanceFactory.agentConfigHash(
request.clusterProfileProperties(), request.elasticProfileProperties());

List<KubernetesInstance> eligiblePods = new ArrayList<>();

for (KubernetesInstance instance : instances.values()) {
if (instance.getJobId().equals(jobId)) {
eligiblePods.add(instance);
continue;
}

String podElasticConfigHash = instance.getPodAnnotations().get(KubernetesInstance.ELASTIC_CONFIG_HASH);
boolean sameElasticConfig = Objects.equals(podElasticConfigHash, jobElasticConfigHash);
boolean instanceIsIdle = instance.getAgentState().equals(KubernetesInstance.AgentState.Idle);
boolean podIsRunning = instance.getPodState().equals(PodState.Running);
boolean isReusable = sameElasticConfig && instanceIsIdle && podIsRunning;

LOG.info(
"[reuse] Is pod {} reusable for job {}? {}. Job has {}={}; pod has {}={}, agentState={}, podState={}",
instance.getPodName(),
jobId,
isReusable,
KubernetesInstance.ELASTIC_CONFIG_HASH,
jobElasticConfigHash,
KubernetesInstance.ELASTIC_CONFIG_HASH,
podElasticConfigHash,
instance.getAgentState(),
instance.getPodState()
);

if (isReusable) {
eligiblePods.add(instance);
}
}

return eligiblePods;
}

private KubernetesInstance createKubernetesInstance(CreateAgentRequest request, PluginSettings settings, PluginRequest pluginRequest, ConsoleLogAppender consoleLogAppender) {

private Optional<KubernetesInstance> requestCreateAgentHelper(
CreateAgentRequest request,
PluginSettings settings,
PluginRequest pluginRequest,
ConsoleLogAppender consoleLogAppender) {
JobIdentifier jobIdentifier = request.jobIdentifier();
if (isAgentCreatedForJob(jobIdentifier.getJobId())) {
String message = format("[Create Agent Request] Request for creating an agent for Job Identifier [{0}] has already been scheduled. Skipping current request.", jobIdentifier);
LOG.warn(message);
consoleLogAppender.accept(message);
return null;
Long jobId = jobIdentifier.getJobId();

// Agent reuse disabled - create a new pod only if one hasn't already been created for this job ID.
if (!settings.getEnableAgentReuse()) {
// Already created a pod for this job ID.
if (isAgentCreatedForJob(jobId)) {
String message = format("[Create Agent Request] Request for creating an agent for Job Identifier [{0}] has already been scheduled. Skipping current request.", jobIdentifier);
LOG.warn(message);
consoleLogAppender.accept(message);
return Optional.empty();
}
// No pod created yet for this job ID. Create one.
KubernetesClient client = factory.client(settings);
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
consoleLogAppender.accept(String.format("Created pod: %s", instance.getPodName()));
instance = instance.toBuilder().agentState(AgentState.Building).build();
register(instance);
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.getPodName()));
return Optional.of(instance);
}

KubernetesClient client = factory.client(settings);
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
consoleLogAppender.accept(String.format("Creating pod: %s", instance.name()));
register(instance);
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.name()));
// Agent reuse enabled - look for any extant pods that match this job,
// and create a new one only if there are none.
List<KubernetesInstance> reusablePods = findPodsEligibleForReuse(request);
LOG.info("[reuse] Found {} pods eligible for reuse for CreateAgentRequest for job {}: {}",
reusablePods.size(),
jobId,
reusablePods.stream().map(pod -> pod.getPodName()).collect(Collectors.toList()));

return instance;
if (reusablePods.isEmpty()) {
KubernetesClient client = factory.client(settings);
KubernetesInstance instance = kubernetesInstanceFactory.create(request, settings, client, pluginRequest);
consoleLogAppender.accept(String.format("Created pod: %s", instance.getPodName()));
instance = instance.toBuilder().agentState(AgentState.Building).build();
register(instance);
consoleLogAppender.accept(String.format("Agent pod %s created. Waiting for it to register to the GoCD server.", instance.getPodName()));
return Optional.of(instance);
} else {
String message = String.format("[reuse] Not creating a new pod - found %s eligible for reuse.", reusablePods.size());
consoleLogAppender.accept(message);
LOG.info(message);
return Optional.empty();
}
}

private boolean isAgentCreatedForJob(Long jobId) {
for (KubernetesInstance instance : instances.values()) {
if (instance.jobId().equals(jobId)) {
if (instance.getJobId().equals(jobId)) {
return true;
}
}
Expand All @@ -111,7 +183,7 @@ public void terminate(String agentId, PluginSettings settings) {
KubernetesInstance instance = instances.get(agentId);
if (instance != null) {
KubernetesClient client = factory.client(settings);
instance.terminate(client);
client.pods().withName(instance.getPodName()).delete();
} else {
LOG.warn(format("Requested to terminate an instance that does not exist {0}.", agentId));
}
Expand Down Expand Up @@ -140,56 +212,77 @@ public Agents instancesCreatedAfterTimeout(PluginSettings settings, Agents agent
continue;
}

if (clock.now().isAfter(instance.createdAt().plus(settings.getAutoRegisterPeriod()))) {
if (clock.now().isAfter(instance.getCreatedAt().plus(settings.getAutoRegisterPeriod()))) {
oldAgents.add(agent);
}
}
return new Agents(oldAgents);
}

public List<Pod> listAgentPods(KubernetesClient client) {
if (client == null) {
throw new IllegalArgumentException("client is null");
}
return client.pods()
.withLabel(Constants.KUBERNETES_POD_KIND_LABEL_KEY, Constants.KUBERNETES_POD_KIND_LABEL_VALUE)
.list()
.getItems();
}

@Override
public void refreshAll(PluginSettings properties) {
LOG.debug("[Refresh Instances] Syncing k8s elastic agent pod information for cluster {}.", properties);
PodList list = null;
List<Pod> pods = null;
try {
KubernetesClient client = factory.client(properties);
list = client.pods().list();
pods = listAgentPods(client);
} catch (Exception e) {
LOG.error("Error occurred while trying to list kubernetes pods:", e);

if (e.getCause() instanceof SocketTimeoutException) {
LOG.error("Error caused due to SocketTimeoutException. This generally happens due to stale kubernetes client. Clearing out existing kubernetes client and creating a new one!");
factory.clearOutExistingClient();
KubernetesClient client = factory.client(properties);
list = client.pods().list();
pods = listAgentPods(client);
}
}

if (list == null) {
if (pods == null) {
LOG.info("Did not find any running kubernetes pods.");
return;
}

Map<String, KubernetesInstance> oldInstances = Map.copyOf(instances);
instances.clear();
for (Pod pod : list.getItems()) {
Map<String, String> podLabels = pod.getMetadata().getLabels();
if (podLabels != null) {
if (Constants.KUBERNETES_POD_KIND_LABEL_VALUE.equals(podLabels.get(Constants.KUBERNETES_POD_KIND_LABEL_KEY))) {
register(kubernetesInstanceFactory.fromKubernetesPod(pod));
}

for (Pod pod : pods) {
String podName = pod.getMetadata().getName();
// preserve pod's agent state
KubernetesInstance newInstance = kubernetesInstanceFactory.fromKubernetesPod(pod);
KubernetesInstance oldInstance = oldInstances.get(podName);
if (oldInstance != null) {
AgentState oldAgentState = oldInstances.get(podName).getAgentState();
newInstance = newInstance.toBuilder().agentState(oldAgentState).build();
LOG.debug("[reuse] Preserved AgentState {} upon refresh of pod {}", oldAgentState, podName);
}
register(newInstance);
}

LOG.info(String.format("[refresh-pod-state] Pod information successfully synced. All(Running/Pending) pod count is %d.", instances.size()));
}

@Override
public KubernetesInstance updateAgent(String agentId, Function<KubernetesInstance, KubernetesInstance> updateFn) {
return instances.compute(agentId, (_agentId, instance) -> updateFn.apply(instance));
}

@Override
public KubernetesInstance find(String agentId) {
return instances.get(agentId);
}

public void register(KubernetesInstance instance) {
instances.put(instance.name(), instance);
instances.put(instance.getPodName(), instance);
}

private KubernetesAgentInstances unregisteredAfterTimeout(PluginSettings settings, Agents knownAgents) throws Exception {
Expand Down

0 comments on commit a7e1761

Please sign in to comment.