Skip to content

Commit

Permalink
[health] bootstrap HealthObserver from agent to API (#16141)
Browse files Browse the repository at this point in the history
* [health] bootstrap HealthObserver from agent to API

* specs: mocked agent needs health observer

* add license headers
  • Loading branch information
yaauie committed May 8, 2024
1 parent 9e452d2 commit 1f5ba9c
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 2 deletions.
5 changes: 5 additions & 0 deletions logstash-core/lib/logstash/agent.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class LogStash::Agent
attr_reader :metric, :name, :settings, :dispatcher, :ephemeral_id, :pipeline_bus
attr_accessor :logger

attr_reader :health_observer

# initialize method for LogStash::Agent
# @param params [Hash] potential parameters are:
# :name [String] - identifier for the agent
Expand All @@ -51,6 +53,9 @@ def initialize(settings = LogStash::SETTINGS, source_loader = nil)
@auto_reload = setting("config.reload.automatic")
@ephemeral_id = SecureRandom.uuid

java_import("org.logstash.health.HealthObserver")
@health_observer = HealthObserver.new

# Mutex to synchronize in the exclusive method
# Initial usage for the Ruby pipeline initialization which is not thread safe
@webserver_control_lock = Mutex.new
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def all
:id => service.agent.id,
:name => service.agent.name,
:ephemeral_id => service.agent.ephemeral_id,
:status => "green", # This is hard-coded to mirror x-pack behavior
:status => service.agent.health_observer.status,
:snapshot => ::BUILD_INFO["build_snapshot"],
:pipeline => {
:workers => LogStash::SETTINGS.get("pipeline.workers"),
Expand Down
8 changes: 7 additions & 1 deletion logstash-core/spec/logstash/webserver_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,13 @@ def free_ports(servers)
end

let(:logger) { LogStash::Logging::Logger.new("testing") }
let(:agent) { OpenStruct.new({:webserver => webserver_block, :http_address => "127.0.0.1", :id => "myid", :name => "myname"}) }
let(:agent) { OpenStruct.new({
webserver: webserver_block,
http_address: "127.0.0.1",
id: "myid",
name: "myname",
health_observer: org.logstash.health.HealthObserver.new,
}) }
let(:webserver_block) { OpenStruct.new({}) }

subject(:webserver) { LogStash::WebServer.new(logger, agent, webserver_options) }
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.health;

import com.google.common.collect.Iterables;

import java.util.EnumSet;

public class HealthObserver {
public final Status getStatus() {
// INTERNAL-ONLY Proof-of-concept to show flow-through to API results
switch (System.getProperty("logstash.apiStatus", "green")) {
case "green": return Status.GREEN;
case "yellow": return Status.YELLOW;
case "red": return Status.RED;
case "random":
final EnumSet<Status> statuses = EnumSet.allOf(Status.class);
return Iterables.get(statuses, new java.util.Random().nextInt(statuses.size()));
default:
return Status.UNKNOWN;
}
}
}
50 changes: 50 additions & 0 deletions logstash-core/src/main/java/org/logstash/health/Status.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.logstash.health;

import com.fasterxml.jackson.annotation.JsonValue;

public enum Status {
UNKNOWN,
GREEN,
YELLOW,
RED,
;

private final String externalValue = name().toLowerCase();

@JsonValue
public String externalValue() {
return externalValue;
}

/**
* Combine this status with another status.
* This method is commutative.
* @param status the other status
* @return the more-degraded of the two statuses.
*/
public Status reduce(Status status) {
if (compareTo(status) >= 0) {
return this;
} else {
return status;
}
}
}
106 changes: 106 additions & 0 deletions logstash-core/src/test/java/org/logstash/health/StatusTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package org.logstash.health;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.Test;
import org.junit.experimental.runners.Enclosed;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.stream.Collectors;

import static com.google.common.collect.Collections2.orderedPermutations;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.logstash.health.Status.*;

@RunWith(Enclosed.class)
public class StatusTest {

public static class Tests {
@Test
public void testReduceUnknown() {
assertThat(UNKNOWN.reduce(UNKNOWN), is(UNKNOWN));
assertThat(UNKNOWN.reduce(GREEN), is(GREEN));
assertThat(UNKNOWN.reduce(YELLOW), is(YELLOW));
assertThat(UNKNOWN.reduce(RED), is(RED));
}

@Test
public void testReduceGreen() {
assertThat(GREEN.reduce(UNKNOWN), is(GREEN));
assertThat(GREEN.reduce(GREEN), is(GREEN));
assertThat(GREEN.reduce(YELLOW), is(YELLOW));
assertThat(GREEN.reduce(RED), is(RED));
}

@Test
public void testReduceYellow() {
assertThat(YELLOW.reduce(UNKNOWN), is(YELLOW));
assertThat(YELLOW.reduce(GREEN), is(YELLOW));
assertThat(YELLOW.reduce(YELLOW), is(YELLOW));
assertThat(YELLOW.reduce(RED), is(RED));
}

@Test
public void testReduceRed() {
assertThat(RED.reduce(UNKNOWN), is(RED));
assertThat(RED.reduce(GREEN), is(RED));
assertThat(RED.reduce(YELLOW), is(RED));
assertThat(RED.reduce(RED), is(RED));
}
}

@RunWith(Parameterized.class)
public static class JacksonSerialization {
@Parameters(name = "{0}")
public static Iterable<?> data() {
return EnumSet.allOf(Status.class);
}

@Parameter
public Status status;

private final ObjectMapper mapper = new ObjectMapper();

@Test
public void testSerialization() throws Exception {
assertThat(mapper.writeValueAsString(status), is(equalTo('"' + status.name().toLowerCase() + '"')));
}
}

@RunWith(Parameterized.class)
public static class ReduceCommutativeSpecification {
@Parameters(name = "{0}<=>{1}")
public static Collection<Object[]> data() {
return getCombinations(EnumSet.allOf(Status.class), 2);
}

@Parameter(0)
public Status statusA;
@Parameter(1)
public Status statusB;

@Test
public void testReduceCommutative() {
assertThat(statusA.reduce(statusB), is(statusB.reduce(statusA)));
}

private static <T extends Comparable<T>> List<Object[]> getCombinations(Collection<T> source, int count) {
return orderedPermutations(source).stream()
.map((l) -> l.subList(0, count))
.map(ArrayList::new).peek(Collections::sort)
.collect(Collectors.toSet())
.stream()
.map(List::toArray)
.collect(Collectors.toList());
}
}
}

0 comments on commit 1f5ba9c

Please sign in to comment.