Skip to content

Commit 76cbbc0

Browse files
authored
fix: remove deadlock possibility by adding resend_queue queue (#73)
* fix: remove deadlock possibility by adding resend_queue queue Signed-off-by: Dominik Rosiek <[email protected]> * fix: fix queues management due to tests Signed-off-by: Dominik Rosiek <[email protected]> * refactor: restrict rescue scope Signed-off-by: Dominik Rosiek <[email protected]> * chore: release 1.4.1 Signed-off-by: Dominik Rosiek <[email protected]> * refactor: restrict rescue scope Signed-off-by: Dominik Rosiek <[email protected]>
1 parent b8cc6fb commit 76cbbc0

File tree

5 files changed

+82
-34
lines changed

5 files changed

+82
-34
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Change Log
22

3+
## 1.4.1 (2022-03-09)
4+
5+
- [#73](https://github.com/SumoLogic/logstash-output-sumologic/pull/73) fix: remove deadlock possibility by adding resend_queue queue
6+
37
## 1.4.0 (2021-09-27)
48

59
- [#68](https://github.com/SumoLogic/logstash-output-sumologic/pull/68) feat: retry on 502 error code

Gemfile.lock

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: .
33
specs:
4-
logstash-output-sumologic (1.4.0)
4+
logstash-output-sumologic (1.4.1)
55
logstash-core-plugin-api (>= 1.60, <= 2.99)
66
logstash-mixin-http_client (>= 6, < 8)
77
manticore (>= 0.5.4, < 1.0.0)
@@ -13,8 +13,8 @@ GEM
1313
numerizer (~> 0.1.1)
1414
clamp (0.6.5)
1515
coderay (1.1.3)
16-
concurrent-ruby (1.1.7)
17-
diff-lcs (1.4.4)
16+
concurrent-ruby (1.1.9)
17+
diff-lcs (1.5.0)
1818
elasticsearch (5.0.5)
1919
elasticsearch-api (= 5.0.5)
2020
elasticsearch-transport (= 5.0.5)
@@ -23,18 +23,38 @@ GEM
2323
elasticsearch-transport (5.0.5)
2424
faraday
2525
multi_json
26-
faraday (1.1.0)
26+
faraday (1.10.0)
27+
faraday-em_http (~> 1.0)
28+
faraday-em_synchrony (~> 1.0)
29+
faraday-excon (~> 1.1)
30+
faraday-httpclient (~> 1.0)
31+
faraday-multipart (~> 1.0)
32+
faraday-net_http (~> 1.0)
33+
faraday-net_http_persistent (~> 1.0)
34+
faraday-patron (~> 1.0)
35+
faraday-rack (~> 1.0)
36+
faraday-retry (~> 1.0)
37+
ruby2_keywords (>= 0.0.4)
38+
faraday-em_http (1.0.0)
39+
faraday-em_synchrony (1.0.0)
40+
faraday-excon (1.1.0)
41+
faraday-httpclient (1.0.1)
42+
faraday-multipart (1.0.3)
2743
multipart-post (>= 1.2, < 3)
28-
ruby2_keywords
29-
ffi (1.13.1-java)
44+
faraday-net_http (1.0.1)
45+
faraday-net_http_persistent (1.2.0)
46+
faraday-patron (1.0.0)
47+
faraday-rack (1.0.0)
48+
faraday-retry (1.0.3)
49+
ffi (1.15.5-java)
3050
filesize (0.0.4)
3151
fivemat (1.3.7)
3252
gem_publisher (1.5.0)
3353
gems (0.8.3)
3454
i18n (0.6.9)
3555
insist (1.0.0)
3656
jar-dependencies (0.4.1)
37-
jrjackson (0.4.13-java)
57+
jrjackson (0.4.14-java)
3858
jruby-openssl (0.9.19-java)
3959
kramdown (1.14.0)
4060
logstash-codec-plain (3.0.6)
@@ -74,11 +94,11 @@ GEM
7494
rspec (~> 3.0)
7595
rspec-wait
7696
stud (>= 0.0.20)
77-
logstash-mixin-http_client (7.0.0)
97+
logstash-mixin-http_client (7.1.0)
7898
logstash-codec-plain
7999
logstash-core-plugin-api (>= 1.60, <= 2.99)
80-
manticore (>= 0.5.2, < 1.0.0)
81-
manticore (0.7.0-java)
100+
manticore (>= 0.8.0, < 1.0.0)
101+
manticore (0.8.0-java)
82102
openssl_pkcs8_pure
83103
method_source (0.8.2)
84104
minitar (0.5.4)
@@ -96,27 +116,27 @@ GEM
96116
rack (1.6.6)
97117
rack-protection (1.5.5)
98118
rack
99-
rake (13.0.1)
100-
rspec (3.10.0)
101-
rspec-core (~> 3.10.0)
102-
rspec-expectations (~> 3.10.0)
103-
rspec-mocks (~> 3.10.0)
104-
rspec-core (3.10.0)
105-
rspec-support (~> 3.10.0)
119+
rake (13.0.6)
120+
rspec (3.11.0)
121+
rspec-core (~> 3.11.0)
122+
rspec-expectations (~> 3.11.0)
123+
rspec-mocks (~> 3.11.0)
124+
rspec-core (3.11.0)
125+
rspec-support (~> 3.11.0)
106126
rspec-eventually (0.2.2)
107-
rspec-expectations (3.10.0)
127+
rspec-expectations (3.11.0)
108128
diff-lcs (>= 1.2.0, < 2.0)
109-
rspec-support (~> 3.10.0)
110-
rspec-mocks (3.10.0)
129+
rspec-support (~> 3.11.0)
130+
rspec-mocks (3.11.0)
111131
diff-lcs (>= 1.2.0, < 2.0)
112-
rspec-support (~> 3.10.0)
113-
rspec-support (3.10.0)
132+
rspec-support (~> 3.11.0)
133+
rspec-support (3.11.0)
114134
rspec-wait (0.0.9)
115135
rspec (>= 3, < 4)
116-
ruby-maven (3.3.12)
136+
ruby-maven (3.3.13)
117137
ruby-maven-libs (~> 3.3.9)
118138
ruby-maven-libs (3.3.9)
119-
ruby2_keywords (0.0.2)
139+
ruby2_keywords (0.0.5)
120140
rubyzip (1.1.7)
121141
sinatra (1.4.8)
122142
rack (~> 1.5)
@@ -145,4 +165,4 @@ DEPENDENCIES
145165
rspec-eventually
146166

147167
BUNDLED WITH
148-
2.2.27
168+
2.3.7

lib/logstash/outputs/sumologic/message_queue.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,8 @@ def enq(batch)
2727
end
2828
end # def enq
2929

30-
def deq()
31-
batch = @queue.deq()
30+
def deq(non_block: false)
31+
batch = @queue.deq(non_block: non_block)
3232
batch_size = batch.payload.bytesize
3333
@stats.record_deque(batch_size)
3434
@queue_bytesize.update { |v| v - batch_size }

lib/logstash/outputs/sumologic/sender.rb

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,14 @@ def initialize(client, queue, stats, config)
2424
@sender_max = (config["sender_max"] ||= 1) < 1 ? 1 : config["sender_max"]
2525
@sleep_before_requeue = config["sleep_before_requeue"] ||= 30
2626
@stats_enabled = config["stats_enabled"] ||= false
27+
@iteration_sleep = 0.3
2728

2829
@tokens = SizedQueue.new(@sender_max)
2930
@sender_max.times { |t| @tokens << t }
3031

32+
# Make resend_queue twice as big as sender_max,
33+
# because if one batch is processed, the next one is already waiting in the thread
34+
@resend_queue = SizedQueue.new(2*@sender_max)
3135
@compressor = LogStash::Outputs::SumoLogic::Compressor.new(config)
3236

3337
end # def initialize
@@ -39,9 +43,24 @@ def start()
3943
@stopping.make_false()
4044
@sender_t = Thread.new {
4145
while @stopping.false?
42-
batch = @queue.deq()
46+
begin
47+
# Resend batch if any in the queue
48+
batch = @resend_queue.deq(non_block: true)
49+
rescue ThreadError
50+
# send new batch otherwise
51+
begin
52+
batch = @queue.deq(non_block: true)
53+
rescue ThreadError
54+
Stud.stoppable_sleep(@iteration_sleep) { @stopping.true? }
55+
next
56+
end
57+
end
4358
send_request(batch)
4459
end # while
60+
@resend_queue.size.times.map { |queue|
61+
batch = queue.deq()
62+
send_request(batch)
63+
}
4564
@queue.drain().map { |batch|
4665
send_request(batch)
4766
}
@@ -98,6 +117,7 @@ def send_request(batch)
98117
return
99118
end
100119

120+
# wait for token so we do not exceed number of request in background
101121
token = @tokens.pop()
102122

103123
if @stats_enabled && content.start_with?(STATS_TAG)
@@ -111,11 +131,9 @@ def send_request(batch)
111131
:content_size => content.size,
112132
:content => content[0..20],
113133
:payload_size => body.size)
134+
135+
# send request in background
114136
request = @client.send(:background).send(:post, @url, :body => body, :headers => headers)
115-
116-
request.on_complete do
117-
@tokens << token
118-
end
119137

120138
request.on_success do |response|
121139
@stats.record_response_success(response.code)
@@ -126,12 +144,16 @@ def send_request(batch)
126144
:headers => headers,
127145
:contet => content[0..20])
128146
if response.code == 429 || response.code == 502 || response.code == 503 || response.code == 504
147+
# requeue and release token
129148
requeue_message(batch)
149+
@tokens << token
130150
end
131151
else
132152
log_dbg("request accepted",
133153
:token => token,
134154
:code => response.code)
155+
# release token
156+
@tokens << token
135157
end
136158
end
137159

@@ -143,6 +165,8 @@ def send_request(batch)
143165
:class => exception.class.name,
144166
:backtrace => exception.backtrace)
145167
requeue_message(batch)
168+
# requeue and release token
169+
@tokens << token
146170
end
147171

148172
@stats.record_request(content.bytesize, body.bytesize)
@@ -162,7 +186,7 @@ def requeue_message(batch)
162186
:content => content[0..20],
163187
:headers => batch.headers)
164188
Stud.stoppable_sleep(@sleep_before_requeue) { @stopping.true? }
165-
@queue.enq(batch)
189+
@resend_queue.enq(batch)
166190
end
167191
end # def reque_message
168192

logstash-output-sumologic.gemspec

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
Gem::Specification.new do |s|
22
s.name = 'logstash-output-sumologic'
3-
s.version = '1.4.0'
3+
s.version = '1.4.1'
44
s.licenses = ['Apache-2.0']
55
s.summary = 'Deliever the log to Sumo Logic cloud service.'
66
s.description = 'This gem is a Logstash output plugin to deliver the log or metrics to Sumo Logic cloud service. Go to https://github.com/SumoLogic/logstash-output-sumologic for getting help, reporting issues, etc.'

0 commit comments

Comments
 (0)