Skip to content

Commit

Permalink
fix: properly serialize unicode pipeline names in API output
Browse files Browse the repository at this point in the history
Jackson's JSON serializer leaks the JRuby-internal byte structure of Symbols,
which only aligns with the byte-structure of the symbol's actual string when
that string is wholly-comprised of lower-ASCII characters.

By pre-converting Symbols to Strings, we ensure that the result is readable
and useful.
  • Loading branch information
yaauie committed Feb 22, 2024
1 parent 11ffc57 commit 96842dd
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 110 deletions.
12 changes: 11 additions & 1 deletion logstash-core/lib/logstash/api/app_helpers.rb
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def respond_with(data, options = {})
end

content_type "application/json"
LogStash::Json.dump(data, {:pretty => pretty?})
LogStash::Json.dump(stringify_symbols(data), {:pretty => pretty?})
else
content_type "text/plain"
data.to_s
Expand Down Expand Up @@ -81,6 +81,16 @@ def pretty?
params.has_key?("pretty")
end

# Recursively stringify symbols in the provided data structure
def stringify_symbols(data)
case data
when Hash then Hash[data.each_pair.map { |k,v| [stringify_symbols(k), stringify_symbols(v)] }]
when Array then data.map { |v| stringify_symbols(v) }
when Symbol then data.to_s
else data
end
end

def generate_error_hash(error)
{
:path => request.path,
Expand Down
16 changes: 13 additions & 3 deletions qa/integration/services/logstash_service.rb
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,22 @@ def start_with_config_string(config)

# Can start LS in stdin and can send messages to stdin
# Useful to test metrics and such
def start_with_stdin(pipeline_config = STDIN_CONFIG)
puts "Starting Logstash #{@logstash_bin} -e #{pipeline_config}"
# @overload start_with_stdin(options)
# @param [Hash{String=>String}] options
# @option options [String] '--config.string': the pipeline to run (default: STDIN_CONFIG)
# @option options [String] '-e': alias for `--config.string`
# @overload start_with_stdin(pipeline_config_string)
# @param [String] pipeline_config_string: the pipeline to run (default: STDIN_CONFIG)
def start_with_stdin(options={})
# pipeline_config = STDIN_CONFIG
params = options.kind_of?(Hash) ? options.dup : Hash['--config.string', options]
params['--config.string'] ||= params.delete('-e') || STDIN_CONFIG

puts "Starting Logstash #{@logstash_bin} #{params.to_a.flatten}"
Bundler.with_unbundled_env do
out = Tempfile.new("duplex")
out.sync = true
@process = build_child_process("-e", pipeline_config)
@process = build_child_process(*params.to_a.flatten(1))
# pipe STDOUT and STDERR to a file
@process.io.stdout = @process.io.stderr = out
@process.duplex = true
Expand Down
258 changes: 152 additions & 106 deletions qa/integration/specs/monitoring_api_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@

let(:number_of_events) { 5 }
let(:max_retry) { 120 }
let(:plugins_config) { "input { stdin {} } filter { mutate { add_tag => 'integration test adding tag' } } output { stdout {} }" }

it "can retrieve event stats" do
logstash_service = @fixture.get_service("logstash")
Expand Down Expand Up @@ -167,34 +166,161 @@
end
end

it "can retrieve queue stats" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin
logstash_service.wait_for_logstash
shared_examples "pipeline metrics" do
# let(:pipeline_id) { defined?(super()) or fail NotImplementedError }

Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
# we use fetch here since we want failed fetches to raise an exception
# and trigger the retry block
queue_stats = result.fetch("pipelines").fetch("main").fetch("queue")
expect(queue_stats).not_to be_nil
if logstash_service.settings.feature_flag == "persistent_queues"
expect(queue_stats["type"]).to eq "persisted"
queue_data_stats = queue_stats.fetch("data")
expect(queue_data_stats["free_space_in_bytes"]).not_to be_nil
expect(queue_data_stats["storage_type"]).not_to be_nil
expect(queue_data_stats["path"]).not_to be_nil
expect(queue_stats["events"]).not_to be_nil
queue_capacity_stats = queue_stats.fetch("capacity")
expect(queue_capacity_stats["page_capacity_in_bytes"]).not_to be_nil
expect(queue_capacity_stats["max_queue_size_in_bytes"]).not_to be_nil
expect(queue_capacity_stats["max_unread_events"]).not_to be_nil
else
expect(queue_stats["type"]).to eq("memory")
it "can retrieve queue stats" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin('--pipeline.id' => pipeline_id)
logstash_service.wait_for_logstash

Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
# we use fetch here since we want failed fetches to raise an exception
# and trigger the retry block
queue_stats = result.fetch("pipelines").fetch(pipeline_id).fetch("queue")
expect(queue_stats).not_to be_nil
if logstash_service.settings.feature_flag == "persistent_queues"
expect(queue_stats["type"]).to eq "persisted"
queue_data_stats = queue_stats.fetch("data")
expect(queue_data_stats["free_space_in_bytes"]).not_to be_nil
expect(queue_data_stats["storage_type"]).not_to be_nil
expect(queue_data_stats["path"]).not_to be_nil
expect(queue_stats["events"]).not_to be_nil
queue_capacity_stats = queue_stats.fetch("capacity")
expect(queue_capacity_stats["page_capacity_in_bytes"]).not_to be_nil
expect(queue_capacity_stats["max_queue_size_in_bytes"]).not_to be_nil
expect(queue_capacity_stats["max_unread_events"]).not_to be_nil
else
expect(queue_stats["type"]).to eq("memory")
end
end
end

it "retrieves the pipeline flow statuses" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin('--pipeline.id' => pipeline_id)
logstash_service.wait_for_logstash
number_of_events.times {
logstash_service.write_to_stdin("Testing flow metrics")
sleep(1)
}

Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
# we use fetch here since we want failed fetches to raise an exception
# and trigger the retry block
expect(result).to include('pipelines' => hash_including(pipeline_id => hash_including('flow')))
flow_status = result.dig("pipelines", pipeline_id, "flow")
expect(flow_status).to_not be_nil
expect(flow_status).to include(
# due to three-decimal-place rounding, it is easy for our worker_concurrency and queue_backpressure
# to be zero, so we are just looking for these to be _populated_
'worker_concurrency' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'worker_utilization' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'queue_backpressure' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
# depending on flow capture interval, our current rate can easily be zero, but our lifetime rates
# should be non-zero so long as pipeline uptime is less than ~10 minutes.
'input_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0),
'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0),
'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0)
)
if logstash_service.settings.feature_flag == "persistent_queues"
expect(flow_status).to include(
'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)),
'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric))
)
else
expect(flow_status).to_not include('queue_persisted_growth_bytes')
expect(flow_status).to_not include('queue_persisted_growth_events')
end
end
end

shared_examples "plugin-level flow metrics" do
it "retrieves plugin level flow metrics" do
plugins_config = <<~EOPIPELINE.gsub(/[[:space:]]+/, " ")
input { stdin { id => '#{plugin_id_input}' } }
filter { mutate { id => '#{plugin_id_filter}' add_tag => 'integration test adding tag' } }
output { stdout { id => '#{plugin_id_output}' } }
EOPIPELINE

logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin('--pipeline.id' => pipeline_id, '--config.string' => plugins_config)
logstash_service.wait_for_logstash
number_of_events.times {
logstash_service.write_to_stdin("Testing plugin-level flow metrics")
sleep(1)
}

Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
# if the result is nil, we probably aren't ready yet
# our assertion failure will cause Stud to retry
expect(result).not_to be_nil

expect(result).to include('pipelines' => hash_including(pipeline_id => hash_including('plugins' => hash_including('inputs', 'filters', 'outputs'))))

input_plugins = result.dig("pipelines", pipeline_id, "plugins", "inputs")
filter_plugins = result.dig("pipelines", pipeline_id, "plugins", "filters")
output_plugins = result.dig("pipelines", pipeline_id, "plugins", "outputs")
expect(input_plugins[0]).to_not be_nil # not ready...

expect(input_plugins).to include(a_hash_including(
'id' => plugin_id_input,
'flow' => a_hash_including(
'throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0)
)
))

expect(filter_plugins).to include(a_hash_including(
'id' => plugin_id_filter,
'flow' => a_hash_including(
'worker_utilization' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'worker_millis_per_event' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
)
))

expect(output_plugins).to include(a_hash_including(
'id' => plugin_id_output,
'flow' => a_hash_including(
'worker_utilization' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'worker_millis_per_event' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
)
))
end
end
end

context "with lower-ASCII plugin id's" do
let(:plugin_id_input) { "standard-input" }
let(:plugin_id_filter) { "Mutations" }
let(:plugin_id_output) { "StandardOutput" }
include_examples "plugin-level flow metrics"
end

context "with unicode plugin id's" do
let(:plugin_id_input) { "입력" }
let(:plugin_id_filter) { "変じる" }
let(:plugin_id_output) { "le-résultat" }
include_examples "plugin-level flow metrics"
end

end

context "with lower-ASCII pipeline id" do
let(:pipeline_id) { "main" }
include_examples "pipeline metrics"
end

context "with unicode pipeline id" do
let(:pipeline_id) { "변환-verändern-変ずる" }
include_examples "pipeline metrics"
end

it "can configure logging" do
Expand Down Expand Up @@ -246,86 +372,6 @@
logging_get_assert logstash_service, "INFO", "TRACE"
end

it "should retrieve the pipeline flow statuses" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin
logstash_service.wait_for_logstash
number_of_events.times {
logstash_service.write_to_stdin("Testing flow metrics")
sleep(1)
}

Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
expect(result).not_to be_nil
# we use fetch here since we want failed fetches to raise an exception
# and trigger the retry block
expect(result).to include('pipelines' => hash_including('main' => hash_including('flow')))
flow_status = result.dig("pipelines", "main", "flow")
expect(flow_status).to_not be_nil
expect(flow_status).to include(
# due to three-decimal-place rounding, it is easy for our worker_concurrency and queue_backpressure
# to be zero, so we are just looking for these to be _populated_
'worker_concurrency' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'worker_utilization' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'queue_backpressure' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
# depending on flow capture interval, our current rate can easily be zero, but our lifetime rates
# should be non-zero so long as pipeline uptime is less than ~10 minutes.
'input_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0),
'filter_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0),
'output_throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0)
)
if logstash_service.settings.feature_flag == "persistent_queues"
expect(flow_status).to include(
'queue_persisted_growth_bytes' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric)),
'queue_persisted_growth_events' => hash_including('current' => a_kind_of(Numeric), 'lifetime' => a_kind_of(Numeric))
)
else
expect(flow_status).to_not include('queue_persisted_growth_bytes')
expect(flow_status).to_not include('queue_persisted_growth_events')
end
end
end

it "should retrieve plugin level flow metrics" do
logstash_service = @fixture.get_service("logstash")
logstash_service.start_with_stdin(plugins_config)
logstash_service.wait_for_logstash
number_of_events.times {
logstash_service.write_to_stdin("Testing plugin-level flow metrics")
sleep(1)
}

Stud.try(max_retry.times, [StandardError, RSpec::Expectations::ExpectationNotMetError]) do
# node_stats can fail if the stats subsystem isn't ready
result = logstash_service.monitoring_api.node_stats rescue nil
# if the result is nil, we probably aren't ready yet
# our assertion failure will cause Stud to retry
expect(result).not_to be_nil

expect(result).to include('pipelines' => hash_including('main' => hash_including('plugins' => hash_including('inputs', 'filters', 'outputs'))))

input_plugins = result.dig("pipelines", "main", "plugins", "inputs")
filter_plugins = result.dig("pipelines", "main", "plugins", "filters")
output_plugins = result.dig("pipelines", "main", "plugins", "outputs")
expect(input_plugins[0]).to_not be_nil

input_plugin_flow_status = input_plugins[0].dig("flow")
filter_plugin_flow_status = filter_plugins[0].dig("flow")
output_plugin_flow_status = output_plugins[0].dig("flow")

expect(input_plugin_flow_status).to include('throughput' => hash_including('current' => a_value >= 0, 'lifetime' => a_value > 0))
expect(filter_plugin_flow_status).to include(
'worker_utilization' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'worker_millis_per_event' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
)
expect(output_plugin_flow_status).to include(
'worker_utilization' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
'worker_millis_per_event' => hash_including('current' => a_value >= 0, 'lifetime' => a_value >= 0),
)
end
end

private

Expand Down

0 comments on commit 96842dd

Please sign in to comment.