Skip to content

Commit 6c5aa47

Browse files
committed
fix server-mode shutdown bug, add baseline specs
1 parent cae6234 commit 6c5aa47

File tree

2 files changed

+137
-15
lines changed

2 files changed

+137
-15
lines changed

lib/logstash/outputs/tcp.rb

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -169,19 +169,19 @@ def register
169169
end
170170
end
171171
else
172-
client_socket = nil
172+
@client_socket = nil
173173
peer_info = nil
174174
@codec.on_event do |event, payload|
175175
begin
176176
# not threadsafe; this is why we require `concurrency: single`
177-
unless client_socket
178-
client_socket = connect
179-
peer_info = client_socket.peer
177+
unless @client_socket
178+
@client_socket = connect
179+
peer_info = @client_socket.peer
180180
end
181181

182182
writable_io = nil
183183
while writable_io.nil? || writable_io.any? == false
184-
readable_io, writable_io, _ = IO.select([client_socket],[client_socket])
184+
readable_io, writable_io, _ = IO.select([@client_socket],[@client_socket])
185185

186186
# don't expect any reads, but a readable socket might
187187
# mean the remote end closed, so read it and throw it away.
@@ -192,15 +192,15 @@ def register
192192
# Now send the payload
193193
@logger.trace("transmitting #{payload.bytesize} bytes", socket: peer_info) if @logger.trace? && payload && !payload.empty?
194194
while payload && payload.bytesize > 0
195-
written_bytes_size = client_socket.syswrite(payload)
195+
written_bytes_size = @client_socket.syswrite(payload)
196196
payload = payload.byteslice(written_bytes_size..-1)
197197
@logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace?
198198
sleep 0.1 unless payload.empty?
199199
end
200200
rescue => e
201201
log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info
202-
client_socket.close rescue nil
203-
client_socket = nil
202+
@client_socket.close rescue nil
203+
@client_socket = nil
204204
sleep @reconnect_interval
205205
retry
206206
end
@@ -210,13 +210,18 @@ def register
210210

211211
# @overload Base#close
212212
def close
213-
@closed.make_true
214-
@server_socket.close rescue nil if @server_socket
213+
if server?
214+
# server-mode clean-up
215+
@closed.make_true
216+
@server_socket.shutdown rescue nil if @server_socket
215217

216-
return unless @client_threads
217-
@client_threads.each do |thread|
218-
client = thread[:client]
219-
client.close rescue nil if client
218+
@client_threads&.each do |thread|
219+
client = thread[:client]
220+
client.close rescue nil if client
221+
end
222+
else
223+
# client-mode clean-up
224+
@client_socket&.close
220225
end
221226
end
222227

spec/outputs/tcp_spec.rb

Lines changed: 118 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
require "flores/pki"
44

55
describe LogStash::Outputs::Tcp do
6-
subject { described_class.new(config) }
6+
subject(:instance) { described_class.new(config) }
77
let(:config) { {
88
"host" => "localhost",
99
"port" => 2000 + rand(3000),
@@ -73,4 +73,121 @@
7373
end
7474
end
7575
end
76+
77+
##
78+
# Reads `in_io` until EOF and writes the bytes
79+
# it receives to `out_io`, tolerating partial writes.
80+
def siphon_until_eof(in_io, out_io)
81+
while (buffer = in_io.read(16*1024)) do
82+
while (buffer && !buffer.empty?) do
83+
bytes_written = out_io.write(buffer)
84+
buffer = buffer.byteslice(bytes_written..-1)
85+
end
86+
end
87+
end
88+
89+
context 'client mode' do
90+
context 'transmitting data' do
91+
let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives
92+
93+
let(:server_host) { 'localhost' }
94+
let(:server_port) { server.addr[1] } # get actual since we bind to port 0
95+
96+
let!(:server) { TCPServer.new(server_host, 0) }
97+
98+
let(:config) do
99+
{ 'host' => server_host, 'port' => server_port, 'mode' => 'client' }
100+
end
101+
102+
let(:event) { LogStash::Event.new({"hello" => "world"})}
103+
104+
subject(:instance) { described_class.new(config) }
105+
106+
before(:each) do
107+
# accepts ONE connection
108+
@server_socket_thread = Thread.start do
109+
client = server.accept
110+
siphon_until_eof(client, io)
111+
end
112+
instance.register
113+
end
114+
115+
after(:each) do
116+
@server_socket_thread&.join
117+
end
118+
119+
it 'encodes and transmits data' do
120+
instance.receive(event)
121+
instance.close # release the connection
122+
sleep 1
123+
expect(io.string).to include('"hello"','"world"')
124+
end
125+
126+
context 'when payload is very large' do
127+
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
128+
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
129+
130+
131+
it 'encodes and transmits data' do
132+
instance.receive(event)
133+
instance.close # release the connection
134+
sleep 1
135+
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
136+
end
137+
end
138+
end
139+
end
140+
141+
context 'server mode' do
142+
context 'transmitting data' do
143+
let(:server_host) { 'localhost' }
144+
let(:server_port) { Random.rand(1024...5000) }
145+
146+
let(:config) do
147+
{ 'host' => server_host, 'port' => server_port, 'mode' => 'server' }
148+
end
149+
150+
subject(:instance) { described_class.new(config) }
151+
152+
before(:each) { instance.register } # start listener
153+
after(:each) { instance.close }
154+
155+
let(:event) { LogStash::Event.new({"hello" => "world"})}
156+
157+
context 'when one client is connected' do
158+
let(:io) { StringIO.new }
159+
let(:client_socket) { TCPSocket.new(server_host, server_port) }
160+
161+
before(:each) do
162+
@client_socket_thread = Thread.start { siphon_until_eof(client_socket, io) }
163+
sleep 1 # wait for it to actually connect
164+
end
165+
166+
it 'encodes and transmits data' do
167+
instance.receive(event)
168+
169+
sleep 1 # wait for the event to get sent...
170+
instance.close # release the connection
171+
172+
@client_socket_thread.join(30) || fail('client failed to join')
173+
expect(io.string).to include('"hello"','"world"')
174+
end
175+
176+
context 'when payload is very large' do
177+
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
178+
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }
179+
180+
it 'encodes and transmits data' do
181+
instance.receive(event)
182+
183+
sleep 1 # wait for the event to get sent...
184+
instance.close # release the connection
185+
186+
@client_socket_thread.join(30) || fail('client failed to join')
187+
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
188+
end
189+
end
190+
end
191+
end
192+
end
76193
end

0 commit comments

Comments
 (0)