Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ensure_log_order config #14

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 17 additions & 14 deletions lib/fluent/plugin/out_scalyr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ScalyrOut < Fluent::Plugin::Output
config_param :replace_invalid_utf8, :bool, :default => false
config_param :compression_type, :string, :default => nil #Valid options are bz2, deflate or None. Defaults to None.
config_param :compression_level, :integer, :default => 9 #An int containing the compression level of compression to use, from 1-9. Defaults to 9 (max)
config_param :ensure_log_order, :bool, :default => true #expect in-order logs to be incoming. Ensure logs to be ordered by timestamp, need to be disabled when accepting multiple input source

config_section :buffer do
config_set_default :retry_max_times, 40 #try a maximum of 40 times before discarding
Expand Down Expand Up @@ -336,20 +337,22 @@ def build_add_events_body( chunk )

thread_id = 0

@sync.synchronize {
#ensure timestamp is at least 1 nanosecond greater than the last one
timestamp = [timestamp, @last_timestamp + 1].max
@last_timestamp = timestamp

#get thread id or add a new one if we haven't seen this tag before
if @thread_ids.key? tag
thread_id = @thread_ids[tag]
else
thread_id = @next_id
@thread_ids[tag] = thread_id
@next_id += 1
end
}
if @ensure_log_order then
@sync.synchronize {
#ensure timestamp is at least 1 nanosecond greater than the last one
timestamp = [timestamp, @last_timestamp + 1].max
@last_timestamp = timestamp

#get thread id or add a new one if we haven't seen this tag before
if @thread_ids.key? tag
thread_id = @thread_ids[tag]
else
thread_id = @next_id
@thread_ids[tag] = thread_id
@next_id += 1
end
}
end

#then update the map of threads for this chunk
current_threads[tag] = thread_id
Expand Down