From 8851e5adaab41fecb3f0d443dbd6f6a3c4557cf5 Mon Sep 17 00:00:00 2001 From: mashhur Date: Tue, 16 Aug 2022 12:09:31 -0700 Subject: [PATCH 1/5] Send entire payload considering return size. --- lib/logstash/outputs/tcp.rb | 126 ++++++++++++++++++++---------------- 1 file changed, 70 insertions(+), 56 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 77cb1ab..4b7852c 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -137,67 +137,81 @@ def register @closed = Concurrent::AtomicBoolean.new(false) setup_ssl if @ssl_enable - if server? - @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") - begin - @server_socket = TCPServer.new(@host, @port) - rescue Errno::EADDRINUSE - @logger.error("Could not start tcp server: Address in use", host: @host, port: @port) - raise - end - if @ssl_enable - @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) - end # @ssl_enable - @client_threads = Concurrent::Array.new - - @accept_thread = Thread.new(@server_socket) do |server_socket| - LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") - loop do - break if @closed.value - client_socket = server_socket.accept_nonblock exception: false - if client_socket == :wait_readable - IO.select [ server_socket ] - next - end - Thread.start(client_socket) do |client_socket| - # monkeypatch a 'peer' method onto the socket. - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } - @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") - client = Client.new(client_socket, @logger) - Thread.current[:client] = client - LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}") - @client_threads << Thread.current - client.run unless @closed.value - end + run_as_server if server? + run_as_client unless server? + end + + def run_as_server + @logger.info("Starting tcp output listener", :address => "#{@host}:#{@port}") + begin + @server_socket = TCPServer.new(@host, @port) + rescue Errno::EADDRINUSE + @logger.error("Could not start tcp server: Address in use", host: @host, port: @port) + raise + end + if @ssl_enable + @server_socket = OpenSSL::SSL::SSLServer.new(@server_socket, @ssl_context) + end # @ssl_enable + @client_threads = Concurrent::Array.new + + @accept_thread = Thread.new(@server_socket) do |server_socket| + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|server_accept") + loop do + break if @closed.value + client_socket = server_socket.accept_nonblock exception: false + if client_socket == :wait_readable + IO.select [ server_socket ] + next + end + Thread.start(client_socket) do |client_socket| + # monkeypatch a 'peer' method onto the socket. + client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") + client = Client.new(client_socket, @logger) + Thread.current[:client] = client + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}") + @client_threads << Thread.current + client.run unless @closed.value end end + end - @codec.on_event do |event, payload| - @client_threads.select!(&:alive?) - @client_threads.each do |client_thread| - client_thread[:client].write(payload) - end + @codec.on_event do |event, payload| + @client_threads.select!(&:alive?) + @client_threads.each do |client_thread| + client_thread[:client].write(payload) end - else - client_socket = nil - @codec.on_event do |event, payload| - begin - client_socket = connect unless client_socket - r,w,e = IO.select([client_socket], [client_socket], [client_socket], nil) - # don't expect any reads, but a readable socket might - # mean the remote end closed, so read it and throw it away. - # we'll get an EOFError if it happens. - client_socket.sysread(16384) if r.any? - - # Now send the payload - client_socket.syswrite(payload) if w.any? - rescue => e - log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) - client_socket.close rescue nil - client_socket = nil - sleep @reconnect_interval - retry + end + end + + def run_as_client + client_socket = nil + @codec.on_event do |event, payload| + begin + client_socket = connect unless client_socket + r,w,_ = IO.select([client_socket],[client_socket]) + + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + r.each { |readable| readable.sysread(16384) } + + # first IO#select attempt allows to be sure read socket is available + # rest of IO#select calls allow to wait the socket to be writable ready + while w == nil || w.any? == false + r,w,_ = IO.select([client_socket],[client_socket]) + end + + while payload && payload.bytesize > 0 + written = client_socket.syswrite(payload) + payload = payload.byteslice(written..-1) end + rescue => e + log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) + client_socket.close rescue nil + client_socket = nil + sleep @reconnect_interval + retry end end end From dfb84a06b8487d77950b89b30157c54471b1d837 Mon Sep 17 00:00:00 2001 From: mashhur Date: Tue, 16 Aug 2022 12:28:32 -0700 Subject: [PATCH 2/5] Add changelog. --- CHANGELOG.md | 3 +++ logstash-output-tcp.gemspec | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21aab5d..6c2d2a7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,6 @@ +## 6.1.1 + - Fix: client ensures to transmit payload size data [#49](https://github.com/logstash-plugins/logstash-output-tcp/pull/49) + ## 6.1.0 - Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47) - Fix: close server and client sockets on plugin close diff --git a/logstash-output-tcp.gemspec b/logstash-output-tcp.gemspec index 14be8f9..c79b6ac 100644 --- a/logstash-output-tcp.gemspec +++ b/logstash-output-tcp.gemspec @@ -1,7 +1,7 @@ Gem::Specification.new do |s| s.name = 'logstash-output-tcp' - s.version = '6.1.0' + s.version = '6.1.1' s.licenses = ['Apache License (2.0)'] s.summary = "Writes events over a TCP socket" s.description = "This gem is a Logstash plugin required to be installed on top of the Logstash core pipeline using $LS_HOME/bin/logstash-plugin install gemname. This gem is not a stand-alone program" From 73dcde9b052542da8ba865e90579d80bc7c54409 Mon Sep 17 00:00:00 2001 From: mashhur Date: Thu, 18 Aug 2022 11:46:57 -0700 Subject: [PATCH 3/5] Alot: send entire payload with server mode, fix log_warn no method error, thread naming with atomic counter. --- lib/logstash/outputs/tcp.rb | 68 ++++++++++++++++++++----------------- 1 file changed, 36 insertions(+), 32 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 4b7852c..0ceea17 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -56,18 +56,22 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base class Client - def initialize(socket, logger) + def initialize(socket, logger_context) @socket = socket - @logger = logger + @logger_context = logger_context @queue = Queue.new end def run loop do begin - @socket.write(@queue.pop) + remaining_payload = @queue.pop + while remaining_payload && remaining_payload.bytesize > 0 + written_bytes_size = @socket.write(remaining_payload) + remaining_payload = remaining_payload.byteslice(written_bytes_size..-1) + end rescue => e - log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil) + @logger_context.log_warn 'socket write failed:', e, socket: (@socket ? @socket.to_s : nil) break end end @@ -80,7 +84,7 @@ def write(msg) def close @socket.close rescue => e - log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) + @logger_context.log_warn 'socket close failed:', e, socket: (@socket ? @socket.to_s : nil) end end # class Client @@ -135,6 +139,7 @@ def register require "socket" require "stud/try" @closed = Concurrent::AtomicBoolean.new(false) + @thread_no = Concurrent::AtomicFixnum.new(0) setup_ssl if @ssl_enable run_as_server if server? @@ -165,11 +170,12 @@ def run_as_server end Thread.start(client_socket) do |client_socket| # monkeypatch a 'peer' method onto the socket. - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") - client = Client.new(client_socket, @logger) + client = Client.new(client_socket, self) Thread.current[:client] = client - LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@client_threads.size}") + @thread_no.increment + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.value}") @client_threads << Thread.current client.run unless @closed.value end @@ -189,22 +195,20 @@ def run_as_client @codec.on_event do |event, payload| begin client_socket = connect unless client_socket - r,w,_ = IO.select([client_socket],[client_socket]) - # don't expect any reads, but a readable socket might - # mean the remote end closed, so read it and throw it away. - # we'll get an EOFError if it happens. - r.each { |readable| readable.sysread(16384) } + writable_oi = nil + while writable_oi.nil? || writable_oi.any? == false + readable_io, writable_oi, _ = IO.select([client_socket],[client_socket]) - # first IO#select attempt allows to be sure read socket is available - # rest of IO#select calls allow to wait the socket to be writable ready - while w == nil || w.any? == false - r,w,_ = IO.select([client_socket],[client_socket]) + # don't expect any reads, but a readable socket might + # mean the remote end closed, so read it and throw it away. + # we'll get an EOFError if it happens. + readable_io.each { |readable| readable.sysread(16384) } end while payload && payload.bytesize > 0 - written = client_socket.syswrite(payload) - payload = payload.byteslice(written..-1) + written_bytes_size = client_socket.syswrite(payload) + payload = payload.byteslice(written_bytes_size..-1) end rescue => e log_warn "client socket failed:", e, host: @host, port: @port, socket: (client_socket ? client_socket.to_s : nil) @@ -233,6 +237,18 @@ def close end end + def log_warn(msg, e, backtrace: @logger.debug?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.warn(msg, details) + end + + def log_error(msg, e, backtrace: @logger.info?, **details) + details = details.merge message: e.message, exception: e.class + details[:backtrace] = e.backtrace if backtrace + @logger.error(msg, details) + end + private def connect @@ -249,7 +265,7 @@ def connect raise end end - client_socket.instance_eval { class << self; include ::LogStash::Util::SocketPeer end } + client_socket.extend(::LogStash::Util::SocketPeer) @logger.debug("opened connection", :client => client_socket.peer) return client_socket rescue => e @@ -267,16 +283,4 @@ def pipeline_id execution_context.pipeline_id || 'main' end - def log_warn(msg, e, backtrace: @logger.debug?, **details) - details = details.merge message: e.message, exception: e.class - details[:backtrace] = e.backtrace if backtrace - @logger.warn(msg, details) - end - - def log_error(msg, e, backtrace: @logger.info?, **details) - details = details.merge message: e.message, exception: e.class - details[:backtrace] = e.backtrace if backtrace - @logger.error(msg, details) - end - end # class LogStash::Outputs::Tcp From b93509aec21eb481791e5c81ee35d67ea9d76e17 Mon Sep 17 00:00:00 2001 From: mashhur Date: Sat, 20 Aug 2022 09:15:32 -0700 Subject: [PATCH 4/5] Review feedbacks applied: adding doc to method, typing fix and simplify logic. --- lib/logstash/outputs/tcp.rb | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/lib/logstash/outputs/tcp.rb b/lib/logstash/outputs/tcp.rb index 0ceea17..1e05b0d 100644 --- a/lib/logstash/outputs/tcp.rb +++ b/lib/logstash/outputs/tcp.rb @@ -56,6 +56,9 @@ class LogStash::Outputs::Tcp < LogStash::Outputs::Base class Client + ## + # @param socket [Socket] + # @param logger_context [#log_warn&#log_error] def initialize(socket, logger_context) @socket = socket @logger_context = logger_context @@ -142,8 +145,11 @@ def register @thread_no = Concurrent::AtomicFixnum.new(0) setup_ssl if @ssl_enable - run_as_server if server? - run_as_client unless server? + if server? + run_as_server + else + run_as_client + end end def run_as_server @@ -174,8 +180,7 @@ def run_as_server @logger.debug("accepted connection", client: client_socket.peer, server: "#{@host}:#{@port}") client = Client.new(client_socket, self) Thread.current[:client] = client - @thread_no.increment - LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.value}") + LogStash::Util.set_thread_name("[#{pipeline_id}]|output|tcp|client_socket-#{@thread_no.increment}") @client_threads << Thread.current client.run unless @closed.value end @@ -196,9 +201,9 @@ def run_as_client begin client_socket = connect unless client_socket - writable_oi = nil - while writable_oi.nil? || writable_oi.any? == false - readable_io, writable_oi, _ = IO.select([client_socket],[client_socket]) + writable_io = nil + while writable_io.nil? || writable_io.any? == false + readable_io, writable_io, _ = IO.select([client_socket],[client_socket]) # don't expect any reads, but a readable socket might # mean the remote end closed, so read it and throw it away. From 52b2fffd05124bd94235f169693a6786e145328b Mon Sep 17 00:00:00 2001 From: Mashhur <99575341+mashhurs@users.noreply.github.com> Date: Sun, 21 Aug 2022 07:16:49 -0700 Subject: [PATCH 5/5] Apply change log suggestions from code review Co-authored-by: Ry Biesemeyer --- CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6c2d2a7..0f5fda4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## 6.1.1 - - Fix: client ensures to transmit payload size data [#49](https://github.com/logstash-plugins/logstash-output-tcp/pull/49) + - Fixes an issue where payloads larger than a connection's current TCP window could be silently truncated [#49](https://github.com/logstash-plugins/logstash-output-tcp/pull/49) ## 6.1.0 - Feat: ssl_supported_protocols (TLSv1.3) [#47](https://github.com/logstash-plugins/logstash-output-tcp/pull/47)