diff --git a/CHANGELOG.md b/CHANGELOG.md index 21aab5d..0f5fda4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 6.1.1 + - Fixes an issue where payloads larger than a connection's current TCP window could be silently truncated [#49](https://github.com/logstash-plugins/logstash-output-tcp/pull/49) + ## 6.1.0 - Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47) - Fix: close server and client sockets on plugin close diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 77cb1ab..1e05b0d 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -56,18 +56,25 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base class Client - def initialize(socket, logger) + ## + # @param socket [Socket] + # @param logger_context [#log_warn&#log_error] + def initialize(socket, logger_context) @socket = socket - @logger = logger + @logger_context = logger_context @queue = Queue.new end def run loop do begin - @socket.write(@queue.pop) + remaining_payload = @queue.pop + while remaining_payload && remaining_payload.bytesize > 0 + written_bytes_size = @socket.write(remaining_payload) + remaining_payload = remaining_payload.byteslice(written_bytes_size..-1) + end rescue => e - log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil) + @logger_context.log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil) break end end @@ -80,7 +87,7 @@ def write(msg) def close @socket.close rescue => e - log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) + @logger_context.log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) end end # class Client @@ -135,69 +142,85 @@ def register require "socket" require "stud/try" @closed = Concurrent::AtomicBoolean.new(false) + @thread_no = Concurrent::AtomicFixnum.new(0) setup_ssl if @ssl_enable if server? - @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") - begin - @server_socket = TCPServer.new(@host, @port) - rescue Errno::EADDRINUSE - @logger.error("Could not start tcp server: Address in use", host: @host, port: @port) - raise - end - if @ssl_enable - @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) - end # @ssl_enable - @client_threads = Concurrent::Array.new - - @accept_thread = Thread.new(@server_socket) do |server_socket| - LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") - loop do - break if @closed.value - client_socket = server_socket.accept_nonblock exception: false - if client_socket == :wait_readable - IO.select [ server_socket ] - next - end - Thread.start(client_socket) do |client_socket| - # monkeypatch a 'peer' method onto the socket. - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") - client = Client.new(client_socket, @logger) - Thread.current[:client] = client - LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}") - @client_threads << Thread.current - client.run unless @closed.value - end + run_as_server + else + run_as_client + end + end + + def run_as_server + @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") + begin + @server_socket = TCPServer.new(@host, @port) + rescue Errno::EADDRINUSE + @logger.error("Could not start tcp server: Address in use", host: @host, port: @port) + raise + end + if @ssl_enable + @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) + end # @ssl_enable + @client_threads = Concurrent::Array.new + + @accept_thread = Thread.new(@server_socket) do |server_socket| + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") + loop do + break if @closed.value + client_socket = server_socket.accept_nonblock exception: false + if client_socket == :wait_readable + IO.select [ server_socket ] + next + end + Thread.start(client_socket) do |client_socket| + # monkeypatch a 'peer' method onto the socket. + client_socket.extend(::LogStash::Util::SocketPeer) + @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") + client = Client.new(client_socket, self) + Thread.current[:client] = client + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}") + @client_threads << Thread.current + client.run unless @closed.value end end + end - @codec.on_event do |event, payload| - @client_threads.select!(&:alive?) - @client_threads.each do |client_thread| - client_thread[:client].write(payload) - end + @codec.on_event do |event, payload| + @client_threads.select!(&:alive?) + @client_threads.each do |client_thread| + client_thread[:client].write(payload) end - else - client_socket = nil - @codec.on_event do |event, payload| - begin - client_socket = connect unless client_socket - r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) + end + end + + def run_as_client + client_socket = nil + @codec.on_event do |event, payload| + begin + client_socket = connect unless client_socket + + writable_io = nil + while writable_io.nil? || writable_io.any? == false + readable_io, writable_io, _ = IO.select([client_socket],[client_socket]) + # don't expect any reads, but a readable socket might # mean the remote end closed, so read it and throw it away. # we'll get an EOFError if it happens. - client_socket.sysread(16384) if r.any? + readable_io.each { |readable| readable.sysread(16384) } + end - # Now send the payload - client_socket.syswrite(payload) if w.any? - rescue => e - log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) - client_socket.close rescue nil - client_socket = nil - sleep @reconnect_interval - retry + while payload && payload.bytesize > 0 + written_bytes_size = client_socket.syswrite(payload) + payload = payload.byteslice(written_bytes_size..-1) end + rescue => e + log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) + client_socket.close rescue nil + client_socket = nil + sleep @reconnect_interval + retry end end end @@ -219,6 +242,18 @@ def close end end + def log_warn(msg, e, backtrace: @logger.debug?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.warn(msg, details) + end + + def log_error(msg, e, backtrace: @logger.info?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.error(msg, details) + end + private def connect @@ -235,7 +270,7 @@ def connect raise end end - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("opened connection", :client => client_socket.peer) return client_socket rescue => e @@ -253,16 +288,4 @@ def pipeline_id execution_context.pipeline_id || 'main' end - def log_warn(msg, e, backtrace: @logger.debug?, **details) - details = details.merge message: e.message, exception: e.class - details[:backtrace] = e.backtrace if backtrace - @logger.warn(msg, details) - end - - def log_error(msg, e, backtrace: @logger.info?, **details) - details = details.merge message: e.message, exception: e.class - details[:backtrace] = e.backtrace if backtrace - @logger.error(msg, details) - end - end # class LogStash::Outputs::Tcp diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index 14be8f9..c79b6ac 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-tcp' - s.version = '6.1.0' + s.version = '6.1.1' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events over a TCP socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program"