Skip to content
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## 6.0.3
- Pulled applicable back-ports from 6.1.0 [#50](https://github.com/logstash-plugins/logstash-output-tcp/pull/50)
- Fix: Ensure sockets are closed when this plugin is closed
- Fix: Fixes an issue in client mode where payloads larger than a connection's current TCP window could be silently truncated
- Fix: Fixes an issue in server mode where payloads larger than a connection's current TCP window could be silently truncated

## 6.0.2
- Fix: unable to start with password protected key [#45](https://github.com/logstash-plugins/logstash-output-tcp/pull/45)

Expand Down
119 changes: 92 additions & 27 deletions lib/logstash/outputs/tcp.rb
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,44 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base
# SSL key passphrase
config :ssl_key_passphrase, :validate => :password, :default => nil

##
# @param socket [Socket]
# @param logger_context [#log_warn&#log_error&#logger]
class Client
public
def initialize(socket, logger)
def initialize(socket, logger_context)
@socket = socket
@logger = logger
@peer_info = socket.peer
@logger_context = logger_context
@queue = Queue.new
end

public
def run
loop do
begin
@socket.write(@queue.pop)
payload = @queue.pop

@logger_context.logger.trace("transmitting #{payload.bytesize} bytes", socket: @peer_info) if @logger_context.logger.trace? && payload && !payload.empty?
while payload && !payload.empty?
written_bytes_size = @socket.write(payload)
payload = payload.byteslice(written_bytes_size..-1)
@logger_context.logger.log_trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: @peer_info) if @logger_context.logger.trace?
end
rescue => e
@logger.warn("tcp output exception", :socket => @socket,
:exception => e)
@logger_context.log_warn("tcp output exception: socket write failed", e, :socket => @peer_info)
break
end
end
end # def run

public
def write(msg)
@queue.push(msg)
end # def write

def close
@socket.close
rescue => e
@logger_context.log_warn 'socket close failed:', e, socket: @socket&.to_s
end
end # class Client

private
Expand Down Expand Up @@ -113,6 +126,8 @@ def register
if @ssl_enable
setup_ssl
end # @ssl_enable
@closed = Concurrent::AtomicBoolean.new(false)
@thread_no = Concurrent::AtomicFixnum.new(0)

if server?
@logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}")
Expand All @@ -129,51 +144,85 @@ def register
@client_threads = []

@accept_thread = Thread.new(@server_socket) do |server_socket|
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept")
loop do
break if @closed.value
Thread.start(server_socket.accept) do |client_socket|
# monkeypatch a 'peer' method onto the socket.
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
client_socket.extend(::LogStash::Util::SocketPeer)
@logger.debug("Accepted connection", :client => client_socket.peer,
:server => "#{@host}:#{@port}")
client = Client.new(client_socket, @logger)
client = Client.new(client_socket, self)
Thread.current[:client] = client
LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}")
@client_threads << Thread.current
client.run
client.run unless @closed.value
end
end
end

@codec.on_event do |event, payload|
@client_threads.select!(&:alive?)
@client_threads.each do |client_thread|
client_thread[:client].write(payload)
end
@client_threads.reject! {|t| !t.alive? }
end
else
client_socket = nil
@client_socket = nil
peer_info = nil
@codec.on_event do |event, payload|
begin
client_socket = connect unless client_socket
r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil)
# don't expect any reads, but a readable socket might
# mean the remote end closed, so read it and throw it away.
# we'll get an EOFError if it happens.
client_socket.sysread(16384) if r.any?
# not threadsafe; this is why we require `concurrency: single`
unless @client_socket
@client_socket = connect
peer_info = @client_socket.peer
end

writable_io = nil
while writable_io.nil? || writable_io.any? == false
readable_io, writable_io, _ = IO.select([@client_socket],[@client_socket])

# don't expect any reads, but a readable socket might
# mean the remote end closed, so read it and throw it away.
# we'll get an EOFError if it happens.
readable_io.each { |readable| readable.sysread(16384) }
end

# Now send the payload
client_socket.syswrite(payload) if w.any?
@logger.trace("transmitting #{payload.bytesize} bytes", socket: peer_info) if @logger.trace? && payload && !payload.empty?
while payload && payload.bytesize > 0
written_bytes_size = @client_socket.syswrite(payload)
payload = payload.byteslice(written_bytes_size..-1)
@logger.trace(">transmitted #{written_bytes_size} bytes; #{payload.bytesize} bytes remain", socket: peer_info) if @logger.trace?
end
rescue => e
@logger.warn("tcp output exception", :host => @host, :port => @port,
:exception => e, :backtrace => e.backtrace)
client_socket.close rescue nil
client_socket = nil
log_warn "client socket failed:", e, host: @host, port: @port, socket: peer_info
@client_socket.close rescue nil
@client_socket = nil
sleep @reconnect_interval
retry
end
end
end
end # def register

# @overload Base#close
def close
if server?
# server-mode clean-up
@closed.make_true
@server_socket.shutdown rescue nil if @server_socket
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to be forward-ported to main/6.1 series, which sends TCPServer#close instead of this TCPServer#shutdown, and causes the plugin to crash during shutdown because a blocking call on TCPServer#accept throws an IOError when the server is closed out of under it.


@client_threads&.each do |thread|
client = thread[:client]
client.close rescue nil if client
end
else
# client-mode clean-up
@client_socket&.close
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All related @client_socket bits will need to be forwarded to main/6.1-series. It cannot be a local variable in register because we need to be able to close it in our close method

end
end

private
def connect
begin
Expand All @@ -183,17 +232,17 @@ def connect
begin
client_socket.connect
rescue OpenSSL::SSL::SSLError => ssle
@logger.error("SSL Error", :exception => ssle, :backtrace => ssle.backtrace)
log_error 'connect ssl failure:', ssle, backtrace: false
# NOTE(mrichar1): Hack to prevent hammering peer
sleep(5)
raise
end
end
client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end }
client_socket.extend(::LogStash::Util::SocketPeer)
@logger.debug("Opened connection", :client => "#{client_socket.peer}")
return client_socket
rescue StandardError => e
@logger.error("Failed to connect: #{e.message}", :exception => e.class, :backtrace => e.backtrace)
log_error 'failed to connect:', e
sleep @reconnect_interval
retry
end
Expand All @@ -208,4 +257,20 @@ def server?
def receive(event)
@codec.encode(event)
end # def receive

def pipeline_id
execution_context.pipeline_id || 'main'
end

def log_warn(msg, e, backtrace: @logger.debug?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.warn(msg, details)
end

def log_error(msg, e, backtrace: @logger.info?, **details)
details = details.merge message: e.message, exception: e.class
details[:backtrace] = e.backtrace if backtrace
@logger.error(msg, details)
end
end # class LogStash::Outputs::Tcp
2 changes: 1 addition & 1 deletion logstash-output-tcp.gemspec
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
Gem::Specification.new do |s|

s.name = 'logstash-output-tcp'
s.version = '6.0.2'
s.version = '6.0.3'
s.licenses = ['Apache License (2.0)']
s.summary = "Writes events over a TCP socket"
s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"
Expand Down
137 changes: 136 additions & 1 deletion spec/outputs/tcp_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
require "flores/pki"

describe LogStash::Outputs::Tcp do
subject { described_class.new(config) }
subject(:instance) { described_class.new(config) }
let(:config) { {
"host" => "localhost",
"port" => 2000 + rand(3000),
Expand Down Expand Up @@ -73,4 +73,139 @@
end
end
end

##
# Reads `in_io` until EOF and writes the bytes
# it receives to `out_io`, tolerating partial writes.
def siphon_until_eof(in_io, out_io)
buffer = ""
while (retval = in_io.read_nonblock(32*1024, buffer, exception:false)) do
(IO.select([in_io], nil, nil, 5); next) if retval == :wait_readable

while (buffer && !buffer.empty?) do
bytes_written = out_io.write(buffer)
buffer.replace buffer.byteslice(bytes_written..-1)
end
end
end

context 'client mode' do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all new specs will need to be forward-ported to main/6.1 series as well.

context 'transmitting data' do
let!(:io) { StringIO.new } # somewhere for our server to stash the data it receives

let(:server_host) { 'localhost' }
let(:server_port) { server.addr[1] } # get actual since we bind to port 0

let!(:server) { TCPServer.new(server_host, 0) }

let(:config) do
{ 'host' => server_host, 'port' => server_port, 'mode' => 'client' }
end

let(:event) { LogStash::Event.new({"hello" => "world"})}

subject(:instance) { described_class.new(config) }

before(:each) do
# accepts ONE connection
@server_socket_thread = Thread.start do
client = server.accept
siphon_until_eof(client, io)
end
instance.register
end

after(:each) do
@server_socket_thread&.join
end

it 'encodes and transmits data' do
instance.receive(event)
sleep 1
instance.close # release the connection
@server_socket_thread.join(30) || fail('server failed to join')
expect(io.string).to include('"hello"','"world"')
end

context 'when payload is very large' do
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }


it 'encodes and transmits data' do
instance.receive(event)
sleep 1
instance.close # release the connection
@server_socket_thread.join(30) || fail('server failed to join')
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
end
end
end
end

context 'server mode' do

def wait_for_condition(total_time_in_seconds, &block)
deadline = Time.now + total_time_in_seconds
until Time.now > deadline
return if yield
sleep(1)
end
fail('condition not met!')
end

context 'transmitting data' do
let(:server_host) { 'localhost' }
let(:server_port) { Random.rand(1024...5000) }

let(:config) do
{ 'host' => server_host, 'port' => server_port, 'mode' => 'server' }
end

subject(:instance) { described_class.new(config) }

before(:each) { instance.register } # start listener
after(:each) { instance.close }

let(:event) { LogStash::Event.new({"hello" => "world"})}

context 'when one client is connected' do
let(:io) { StringIO.new }
let(:client_socket) { TCPSocket.new(server_host, server_port) }

before(:each) do
@client_socket_thread = Thread.start { siphon_until_eof(client_socket, io) }
sleep 1 # wait for it to actually connect
end

it 'encodes and transmits data' do
sleep 1
instance.receive(event)

wait_for_condition(30) { !io.size.zero? }
sleep 1 # wait for the event to get sent...
instance.close # release the connection

@client_socket_thread.join(30) || fail('client failed to join')
expect(io.string).to include('"hello"','"world"')
end

context 'when payload is very large' do
let(:one_hundred_megabyte_message) { "a" * 1024 * 1024 * 100 }
let(:event) { LogStash::Event.new("message" => one_hundred_megabyte_message) }

it 'encodes and transmits data' do
instance.receive(event)

wait_for_condition(30) { io.size >= one_hundred_megabyte_message.size }
sleep 1 # wait for the event to get sent...
instance.close # release the connection

@client_socket_thread.join(30) || fail('client failed to join')
expect(io.string).to include('"message"',%Q("#{one_hundred_megabyte_message}"))
end
end
end
end
end
end