Skip to content

Commit cae2fdb

Browse files
committed
Factor out IO-selection code, prepare for PR #2466
This commit sets the stage for an upcoming fix for #2466. In a nutshell, this is what it does: - In `cask/system_command.rb`, factor out existing code (“IO selection”) that we’re going to need later in order to fix PR #2466; - move said code into its own class `Utils::IOSelector`; - split the class into smaller methods to make the code more expressive than before; and - add unit tests for `Utils::IOSelector` (they’re a bit bloated because edge cases.)
1 parent ed9942f commit cae2fdb

File tree

5 files changed

+259
-17
lines changed

5 files changed

+259
-17
lines changed

Library/Homebrew/cask/lib/hbc/system_command.rb

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -81,31 +81,16 @@ def each_output_line(&b)
8181

8282
write_input_to(raw_stdin)
8383
raw_stdin.close_write
84-
each_line_from [raw_stdout, raw_stderr], &b
8584

85+
::Utils::IOSelector
86+
.each_line_from(stdout: raw_stdout, stderr: raw_stderr, &b)
8687
@processed_status = raw_wait_thr.value
8788
end
8889

8990
def write_input_to(raw_stdin)
9091
[*options[:input]].each { |line| raw_stdin.print line }
9192
end
9293

93-
def each_line_from(sources)
94-
loop do
95-
readable_sources = IO.select(sources)[0]
96-
readable_sources.delete_if(&:eof?).first(1).each do |source|
97-
type = ((source == sources[0]) ? :stdout : :stderr)
98-
begin
99-
yield(type, source.readline_nonblock || "")
100-
rescue IO::WaitReadable, EOFError
101-
next
102-
end
103-
end
104-
break if readable_sources.empty?
105-
end
106-
sources.each(&:close_read)
107-
end
108-
10994
def result
11095
Result.new(command,
11196
processed_output[:stdout],

Library/Homebrew/test/spec_helper.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
require "rspec/its"
44
require "rspec/wait"
55
require "set"
6+
require "English"
67

78
if ENV["HOMEBREW_TESTS_COVERAGE"]
89
require "simplecov"
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
require "utils/io_selector"
2+
3+
describe Utils::IOSelector do
4+
describe "given text streams" do
5+
let(:first_pipe) { IO.pipe }
6+
let(:second_pipe) { IO.pipe }
7+
8+
let(:first_reader) { first_pipe[0] }
9+
let(:second_reader) { second_pipe[0] }
10+
11+
let(:first_writer) { first_pipe[1] }
12+
let(:second_writer) { second_pipe[1] }
13+
14+
let(:queues) { { first: Queue.new, second: Queue.new } }
15+
16+
let(:write_first!) do
17+
thread = Thread.new(first_writer, queues[:second]) do |io, queue|
18+
io.puts "Lorem"
19+
wait(1).for { queue.pop }.to end_with("\n")
20+
io.puts "dolor"
21+
io.close
22+
end
23+
thread.abort_on_exception = true
24+
thread
25+
end
26+
27+
let(:write_second!) do
28+
thread = Thread.new(second_writer, queues[:first]) do |io, queue|
29+
wait(1).for { queue.pop }.to end_with("\n")
30+
io.puts "ipsum"
31+
wait(1).for { queue.pop }.to end_with("\n")
32+
io.puts "sit"
33+
io.puts "amet"
34+
io.close
35+
end
36+
thread.abort_on_exception = true
37+
thread
38+
end
39+
40+
let(:queue_feeder) do
41+
lambda do |proc_under_test, *args, &block|
42+
proc_under_test.call(*args) do |tag, string_received|
43+
queues[tag] << string_received
44+
block.call(tag, string_received)
45+
end
46+
end
47+
end
48+
49+
before do
50+
allow_any_instance_of(Utils::IOSelector)
51+
.to receive(:each_line_nonblock)
52+
.and_wrap_original do |*args, &block|
53+
queue_feeder.call(*args, &block)
54+
end
55+
56+
write_first!
57+
write_second!
58+
end
59+
60+
after do
61+
write_first!.exit
62+
write_second!.exit
63+
end
64+
65+
describe "::each_line_from" do
66+
subject do
67+
line_hash = { first: "", second: "", full_text: "" }
68+
69+
Utils::IOSelector.each_line_from(
70+
first: first_reader,
71+
second: second_reader,
72+
) do |tag, string_received|
73+
line_hash[tag] << string_received
74+
line_hash[:full_text] << string_received
75+
end
76+
line_hash
77+
end
78+
79+
before { wait(1).for(subject) }
80+
81+
its([:first]) { is_expected.to eq("Lorem\ndolor\n") }
82+
its([:second]) { is_expected.to eq("ipsum\nsit\namet\n") }
83+
84+
its([:full_text]) {
85+
is_expected.to eq("Lorem\nipsum\ndolor\nsit\namet\n")
86+
}
87+
end
88+
89+
describe "::new" do
90+
let(:selector) do
91+
Utils::IOSelector.new(
92+
first: first_reader,
93+
second: second_reader,
94+
)
95+
end
96+
subject { selector }
97+
98+
describe "pre-read" do
99+
its(:pending_streams) {
100+
are_expected.to eq([first_reader, second_reader])
101+
}
102+
103+
its(:separator) {
104+
is_expected.to eq($INPUT_RECORD_SEPARATOR)
105+
}
106+
end
107+
108+
describe "post-read" do
109+
before do
110+
wait(1).for {
111+
subject.each_line_nonblock {}
112+
true
113+
}.to be true
114+
end
115+
116+
after { expect(selector.all_streams).to all be_closed }
117+
118+
its(:pending_streams) { are_expected.to be_empty }
119+
its(:separator) {
120+
is_expected.to eq($INPUT_RECORD_SEPARATOR)
121+
}
122+
end
123+
124+
describe "#each_line_nonblock" do
125+
subject do
126+
line_hash = { first: "", second: "", full_text: "" }
127+
super().each_line_nonblock do |tag, string_received|
128+
line_hash[tag] << string_received
129+
line_hash[:full_text] << string_received
130+
end
131+
line_hash
132+
end
133+
134+
before { wait(1).for(subject) }
135+
after { expect(selector.all_streams).to all be_closed }
136+
137+
its([:first]) { is_expected.to eq("Lorem\ndolor\n") }
138+
its([:second]) { is_expected.to eq("ipsum\nsit\namet\n") }
139+
140+
its([:full_text]) {
141+
is_expected.to eq("Lorem\nipsum\ndolor\nsit\namet\n")
142+
}
143+
end
144+
145+
its(:all_streams) {
146+
are_expected.to eq([first_reader, second_reader])
147+
}
148+
149+
its(:all_tags) { are_expected.to eq([:first, :second]) }
150+
151+
describe "#tag_of" do
152+
subject do
153+
{
154+
"first tag" => super().tag_of(first_reader),
155+
"second tag" => super().tag_of(second_reader),
156+
}
157+
end
158+
159+
its(["first tag"]) { is_expected.to eq(:first) }
160+
its(["second tag"]) { is_expected.to eq(:second) }
161+
end
162+
end
163+
164+
describe "::new with a custom separator" do
165+
subject do
166+
tagged_streams = {
167+
first: first_reader,
168+
second: second_reader,
169+
}
170+
Utils::IOSelector.new(tagged_streams, ",")
171+
end
172+
173+
its(:separator) { is_expected.to eq(",") }
174+
end
175+
end
176+
end

Library/Homebrew/utils.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
require "utils/github"
1010
require "utils/hash"
1111
require "utils/inreplace"
12+
require "utils/io_selector"
1213
require "utils/link"
1314
require "utils/popen"
1415
require "utils/svn"
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
require "delegate"
2+
require "English"
3+
4+
require "extend/io"
5+
6+
module Utils
7+
#
8+
# The class `IOSelector` is a wrapper for `IO::select` with the
9+
# added benefit that it spans the streams' lifetimes.
10+
#
11+
# The class accepts multiple IOs which must be open for reading.
12+
# It then notifies the client as data becomes available
13+
# per-stream.
14+
#
15+
# Its main use is to allow a client to read both `stdout` and
16+
# `stderr` of a subprocess in a way that avoids buffer-related
17+
# deadlocks.
18+
#
19+
# For a more in-depth explanation, see:
20+
# https://github.com/Homebrew/brew/pull/2466
21+
#
22+
class IOSelector < DelegateClass(Hash)
23+
attr_reader :separator
24+
25+
alias all_streams keys
26+
alias all_tags values
27+
alias tag_of fetch
28+
29+
def self.each_line_from(streams = {},
30+
separator = $INPUT_RECORD_SEPARATOR, &block)
31+
new(streams, separator).each_line_nonblock(&block)
32+
end
33+
34+
def initialize(streams = {},
35+
separator = $INPUT_RECORD_SEPARATOR)
36+
super(streams.invert.compare_by_identity)
37+
@separator = separator
38+
end
39+
40+
def each_line_nonblock
41+
each_readable_stream_until_eof do |stream|
42+
line = stream.readline_nonblock(separator) || ""
43+
yield(tag_of(stream), line)
44+
end
45+
close_streams
46+
end
47+
48+
def pending_streams
49+
@pending_streams ||= all_streams.dup
50+
end
51+
52+
private
53+
54+
def each_readable_stream_until_eof(&block)
55+
loop do
56+
readable_streams.each do |stream|
57+
pending_streams.delete(stream) if stream.eof?
58+
yield_gracefully(stream, &block)
59+
end
60+
break if pending_streams.empty?
61+
end
62+
end
63+
64+
def readable_streams
65+
IO.select(pending_streams)[0]
66+
end
67+
68+
def yield_gracefully(stream)
69+
yield(stream)
70+
rescue IO::WaitReadable, IO::WaitWritable, EOFError
71+
# We'll be back until/unless EOF
72+
return
73+
end
74+
75+
def close_streams
76+
all_streams.each(&:close_read)
77+
end
78+
end
79+
end

0 commit comments

Comments
 (0)