Skip to content

Commit

Permalink
Merge pull request #54 from SumoLogic/rmiller-fluentd-output-fix-meta…
Browse files Browse the repository at this point in the history
…data-key

Fix fields handling to properly batch HTTP request payloads
  • Loading branch information
Ryan Miller authored Oct 16, 2019
2 parents 3050558 + 5a00dff commit 7fc9169
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 23 deletions.
29 changes: 6 additions & 23 deletions lib/fluent/plugin/out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -195,36 +195,20 @@ def sumo_key(sumo_metadata, chunk)
source_host = sumo_metadata['host'] || @source_host
source_host = extract_placeholders(source_host, chunk) unless source_host.nil?

"#{source_name}:#{source_category}:#{source_host}"
fields = sumo_metadata['fields'] || ""
fields = extract_placeholders(fields, chunk) unless fields.nil?

"#{source_name}:#{source_category}:#{source_host}:#{fields}"
end

# Convert timestamp to 13 digit epoch if necessary
def sumo_timestamp(time)
time.to_s.length == 13 ? time : time * 1000
end

def sumo_fields(sumo_metadata)
fields = sumo_metadata['fields'] || ""
Hash[
fields.split(',').map do |pair|
k, v = pair.split('=', 2)
[k, v]
end
]
end

def dump_collected_fields(log_fields)
if log_fields.nil?
log_fields
else
log_fields.map{|k,v| "#{k}=#{v}"}.join(',')
end
end

# This method is called every flush interval. Write the buffer chunk
def write(chunk)
messages_list = {}
log_fields = nil

# Sort messages
chunk.msgpack_each do |time, record|
Expand Down Expand Up @@ -252,7 +236,6 @@ def write(chunk)
end
log = dump_log(merge_json(record))
when 'fields'
log_fields = sumo_fields(sumo_metadata)
if @add_timestamp
record = { @timestamp_key => sumo_timestamp(time) }.merge(record)
end
Expand Down Expand Up @@ -282,15 +265,15 @@ def write(chunk)

# Push logs to sumo
messages_list.each do |key, messages|
source_name, source_category, source_host = key.split(':')
source_name, source_category, source_host, fields = key.split(':')
@sumo_conn.publish(
messages.join("\n"),
source_host =source_host,
source_category =source_category,
source_name =source_name,
data_type =@data_type,
metric_data_format =@metric_data_format,
collected_fields =dump_collected_fields(log_fields)
collected_fields =fields
)
end

Expand Down
81 changes: 81 additions & 0 deletions test/plugin/test_out_sumologic.rb
Original file line number Diff line number Diff line change
Expand Up @@ -394,4 +394,85 @@ def test_emit_prometheus
times:1
end

def test_batching_same_headers
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json
source_category test
source_host test
source_name test
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => 'test1'})
driver.feed("output.test", time, {'message' => 'test2'})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'},
body: /\A{"timestamp":\d+.,"message":"test1"}\n{"timestamp":\d+.,"message":"test2"}\z/,
times:1
end

def test_batching_different_headers
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format json
source_category test
source_host test
source_name test
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => 'test1', '_sumo_metadata' => {"category": "cat1"}})
driver.feed("output.test", time, {'message' => 'test2', '_sumo_metadata' => {"category": "cat2"}})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'cat1', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'},
body: /\A{"timestamp":\d+.,"message":"test1"}\z/,
times:1
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'cat2', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'},
body: /\A{"timestamp":\d+.,"message":"test2"}\z/,
times:1
end

def test_batching_different_fields
config = %{
endpoint https://collectors.sumologic.com/v1/receivers/http/1234
log_format fields
source_category test
source_host test
source_name test
}
driver = create_driver(config)
time = event_time
stub_request(:post, 'https://collectors.sumologic.com/v1/receivers/http/1234')
driver.run do
driver.feed("output.test", time, {'message' => 'test1'})
driver.feed("output.test", time, {'message' => 'test2', '_sumo_metadata' => {"fields": "foo=bar"}})
driver.feed("output.test", time, {'message' => 'test3', '_sumo_metadata' => {"fields": "foo=bar,sumo=logic"}})
driver.feed("output.test", time, {'message' => 'test4', '_sumo_metadata' => {"fields": "foo=bar,abc=123"}})
end
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test'},
body: /\A{"timestamp":\d+.,"message":"test1"}\z/,
times:1
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'X-Sumo-Fields' => 'foo=bar'},
body: /\A{"timestamp":\d+.,"message":"test2"}\z/,
times:1
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'X-Sumo-Fields' => 'foo=bar,sumo=logic'},
body: /\A{"timestamp":\d+.,"message":"test3"}\z/,
times:1
assert_requested :post, "https://collectors.sumologic.com/v1/receivers/http/1234",
headers: {'X-Sumo-Category'=>'test', 'X-Sumo-Client'=>'fluentd-output', 'X-Sumo-Host'=>'test', 'X-Sumo-Name'=>'test', 'X-Sumo-Fields' => 'foo=bar,abc=123'},
body: /\A{"timestamp":\d+.,"message":"test4"}\z/,
times:1
end

end

0 comments on commit 7fc9169

Please sign in to comment.